Over a million developers have joined DZone.
Refcard #227

Reactive Programming in JavaScript With RxJS

Using Streams for Event-Driven Web Applications

Written by

Luis Atencio Staff Software Engineer, Citrix Systems

Web application code quickly becomes tangled, hard to maintain, and hard to test. The problem is that asynchronous computations are inherently difficult to manage. RxJS solves async without the callback pyramid of doom.

Free PDF
DOWNLOAD
Section 1

Intro

As experienced JavaScript developers, we’re used to dealing with a certain level of asynchronous computations in our code. We’re constantly asked to process user input, fetch data from remote locations, or run long-running computations simultaneously, all without halting the browser. Arguably, these are not trivial tasks and certainly require that we learn to step away from the synchronous computation paradigm, and step into a model where time and latency become key issues. For simple applications, using JavaScript’s main event system directly or even wrapped with the help of jQuery libraries is common practice. However, without the proper paradigm in place, scaling the simple code that solves these asynchronous problems to a richer and feature-complete app that meets the needs of the modern day web user, is still difficult. What we find is that our application code becomes tangled, hard to maintain, and hard to test. The problem is that asynchronous computations are inherently difficult to manage, and RxJS solves this problem.

Section 2

The problem RxJS solves

One of the most important goals of any application is to remain responsive at all times. This means that it’s unacceptable for an application to halt while processing user input or fetching some additional data from the server via AJAX. Generally speaking, the main issue is that IO operations (reading from disk or from the network) are much slower than executing instructions on the CPU. This applies both to the client and the server. Let’s focus on the client. In JavaScript, the solution has always been to take advantage of the browser’s multiple connections and use callbacks to spawn a separate process that takes care of some long-running task. Once the task terminates, the JavaScript runtime will invoke the callback with the data. This a form of inversion of control, as the control of the program is not driven by you (because you can’t predict when a certain process will complete), but rather under the responsibility of the runtime to give it back to you. While very useful for small applications, the use of callbacks gets messy with richer applications that need to handle an influx of data coming from the user as well as remote HTTP calls. We’ve all been through this: as soon as you need multiple pieces of data you begin to fall into the popular “pyramid of doom” or callback hell.

makeHttpCall('/items', 
   items => {
      for (itemId of items) {
         makeHttpCall(`/items/${itemId}/info`,
           itemInfo => {        
              makeHttpCall(`/items/${itemInfo.pic}`,
                img => {
                    showImg(img);
              });   
           });
      }
});
beginUiRendering();

This code has multiple issues. Once of them is style. As you pile more logic into these nested callback functions, this code becomes more complex and harder to reason about. A more subtle issue is created by the for loop. A for loop is a synchronous control flow statement that doecesn’t work well with asynchronous calls that have latency, which could lead to very strange bugs.

Historically, this has been a very big problem for JavaScript developers, so the language introduced Promises in ES6. Promises help shoulder some of these issues by providing a nice fluent interface that captures the notion of time and exposes a continuity method called then(). The same code above becomes:

makeHttpCall('/items')
    .then(itemId => makeHttpCall(`/items/${itemId}/info`))
    .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`))
    .then(showImg);

Certainly this is a step in the right direction. The sheer mental load of reading this code has reduced dramatically. But Promises have some limitations, as they are really efficient for working with single value (or single error) events. What about handling user input where there’s a constant flow of data? Promises are also insufficient to handle events because they lack semantics for event cancellation, disposal, retries, etc. Enter RxJS.

Section 3

Hello RxJS

RxJS is a library that directly targets problems of an asynchronous nature. Originating from the Reactive Extensions project, it brings the best concepts from the Observer pattern and functional programming together. The Observer pattern is used to provide a proven design based on Producers (the creators of event) and Consumers (the observers listening for said events), which offers a nice separation of concerns.

Moreover, functional programming concepts such as declarative programming, immutable data structures, and fluent method chaining—to name a few— enable you to write very expressive and easy to reason about code (bye-bye, callbacks).

For an overview of functional programming concepts, please visit the Functional Programming in JavaScript RefCard.

If you’re familiar with functional programming in JavaScript, you can think of RxJS as the “Underscore.js of asynchronous programming.”

RxJS introduces an overarching data type called the stream.

Section 4

Understanding Streams

Streams are nothing more than a sequence of events over time. Streams can be used to process any of type of event such as mouse clicks, key presses, bits of network data, etc. You can think of streams as variables that with the ability to react to changes emitted from the data they point to.

Variables and streams are both dynamic, but behave a bit differently; in order to understand this, let’s look at a simple example. Consider the following simple arithmetic:

var a = 2;
var b = 4;
var c = a + b;
console.log(c); //-> 6

a = 10;  // reassign a
console.log(c); //-> still 6

Even though variable a changed to 10, the values of the other dependent variables remain the same and do not propagate through–by design. This is where the main difference is. A change in an event always gets propagated from the source of the event (producers) down to any parts that are listening (consumers). Hypothetically speaking, if these variables were to behave like streams, the following would occur:

var A$ = 2;
var B$ = 4;
var C$ = A$ + B$;
console.log(C$); //-> 6

A$ = 10;  
console.log(C$); //->  16

As you can see, streams allow you to specify the dynamic behavior of a value declaratively (as a convention, I like to use the $ symbol in front of stream variables). In other words, C$ is a specification that concatenates (or adds) the values of streams A$ and B$. As soon as a new value is pushed onto A$, C$ immediately reacts to the change printing the new value 16. Now, this is a very contrived example and far from actual syntax, but it serves to explain programming with event streams versus variables.

Now let’s begin learning some RxJS.

Section 5

The Observable

Perhaps the most important part of the RxJS library is the declaration of the Observable type. Observables are used to wrap a piece of data (button clicks, key presses, mouse movement, numbers, strings, or arrays) and decorate it with stream-like qualities. The simplest observable you can create is one with a single value, for instance:

var streamA$ = Rx.Observable.of(2);

Let’s revisit the example above, this time using real RxJS syntax. I’ll show you some new APIs that I’ll talk about more in a bit:

const streamA$ = Rx.Observable.of(2);
const streamB$ = Rx.Observable.of(4);
const streamC$ = Rx.Observable.concat(streamA$, streamB$)
  .reduce((x, y) => x  + y);

streamC$.subscribe(console.log); //prints 6

Running this example prints the value 6. Unlike the pseudo code above, I can’t really reassign the value of a stream after its been declared because that would just create a new stream–it’s an immutable data type. As a convention, since it’s all immutable I can safely use the ES6 const keyword to make my code even more robust.

In order to push new values, you will need to modify the declaration of streamA$:

const streamA$ = Rx.Observable.of(2, 10)
...
streamC$.subscribe(console.log); //prints 16

Now, subscribing to streamC$ would generate 16. Like I mentioned before, streams are just sequences of events distributed over time. This is often visualized using a marble diagram:

Simple stream

Section 6

Creating Observables

Observables can be created from a variety of different sources. Here are the more common ones to use:

Source Observable (static) method
of(arg) Converts arguments to an observable sequence
from(iterable) Converts arguments to an observable sequence
fromPromise(promise) Converts a Promises/A+ spec-compliant Promise and/or ES2015-compliant Promise to an observable sequence
fromEvent(element, eventName) Creates an observable sequence by adding an event listener to the matching DOMElement, jQuery element, Zepto Element, Angular element, Ember.js element, or EventEmitter

The other noticeable difference when programming with streams is the subscription step. Observables are lazy data types, which means that nothing will actually run (no events emitted, for that matter) until a subscriber streamC$.subscribe(...) is attached. This subscription mechanism is handled by Observer.

Section 7

The Observer

Observers represent the consumer side of the model and are in charge of reacting to values produced or emitted by the corresponding Observables. The Observer is a very simple API based on the Iterator pattern that defines a next() function. This function is called on every event that’s pushed onto an observable. Behind the scenes, the shorthand subscription above streamC$.subscribe(console.log) creates an Observer object behind the scenes that looks like this:

const observer = Rx.Observer.create(
    function next(val) {  
        console.log(val);
    },
    function error(err) { 
        ; // fired in case an exception occurs
    },
    function complete() {
        ; // fired when all events have been processed
    }
);

Observers also specify an API to handle any errors, as well as a means to signal that all the events have been processed. All of the Observer methods are optional, you can subscribe to an observable with just .subscribe(), but the most common approach is to at least provide a single function which will be mapped to next(). In this function is where you would typically perform any effectful computations like writing to a file, logging to the console, appending to the DOM, or whatever tasks you’re required you to do.

Section 8

The Subscription

Subscribing to an Observable returns a Subscription object, which you can use to unsubscribe or dispose of the stream at ant point in time. This mechanism is really nice because it fills in the gap of the native JavaScript system, where cancelling events and disposing of them correctly has always been problematic.

To show this, I’ll create an observable to listen for all click events:

Obviously, this represents an infinite stream of clicks (the completed signal will never actually fire). If I want to stop listening for events, I can simply call the unsubscribe() method on the Subscription instance returned from the Observable. This will also properly clean up and dispose of any event handlers and temporary objects created.

Now that you know how to create and destroy streams, let’s take a look at a how to use them to support any problem domain you’re tackling. I’ll stick to using numbers as my domain model to illustrate these APIs, but of course you can use them to supply the business logic of any domain.

Section 9

Sequencing with streams

At the heart of RxJS is to provide a unified programming model to handle any type of data, whether it’s synchronous (like an array) or asynchronous (remote HTTP call). RxJS uses a simple, familiar API based on the functional programming extensions added to JavaScript arrays (known as the Array#extras) with functions: map, filter, and reduce.

name Description
map(fn) Projects each element of an observable sequence into a new form
filter(predicate) Filters the elements of an observable sequence based on a predicate
reduce(accumulator, [seed]) Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value

I’ll begin with arrays. Given an array of numbers from 1 - 5, I will filter out odd numbers, compute their square, and sum them. Using traditional imperative code, this will require the use of at least a loop and a conditional statement. Using the functional Array methods I get:

Now, with very minor changes, this can work with Observables instance methods just as easily. Just like with arrays, the operators called on the Observable receive data from its preceding operator. The goal is to transform the input and apply business logic as needed within the realms of the Observable operators.

You can visualize what’s happening behind the scenes with the following diagram:

Filter, Map, Reduce

Arrays are predictable streams because they are all in memory at the time of consumption. Regardless, if these numbers were computed as the result of an HTTP call (perhaps wrapped in a Promise), the same code would still hold:

Alternatively, of course, I can create independent side-effect-free functions that can be injected into the Observable sequence. This allows your code to look even more declarative:

This is the beauty of RxJS: a single programming model can support all cases. Additionally, observables allow you to sequence and chain operators together very fluently while abstracting the problem of latency and wait-time away from your code. Notice also that this way of writing code also eliminates the complexity involved in looping and conditional statements in favor of higher-order functions.

Section 10

Dealing with time

Understanding how RxJS effectively deals with the time and latency can be done via its time-based operators. Here’s a short list of the most common ones used to create Observables with time in them:

name Description
Rx.Observable.interval(period) Returns an observable sequence that produces a value after each period
Rx.Observable.timer(dueTime) Returns an observable sequence that produces a value after dueTime has elapsed and then after each period

These are very nice for simulating timed events:

Interval

Using interval(500) will emit values every half a second. Because this is an infinite stream, I’m taking only the first 5, converting the Observable into a finite stream that will actually send a completed signal.

Section 11

Handling user input

You can also use Observables to interact with DOM events. Using Rx.Observable.fromEvent I can listen for any DOM events. Here’s a quick example:

I can handle clicks in this case and perform any action within the observer. The map operator is used here to transform the incoming click event, extract the underlying element, and read its href attribute.

Section 12

Handling asynchronous calls

Handling user input is not the only type of asynchronous actions you can work with. RxJS also nicely integrates with the ES6 Promise API to fetch remote data. Suppose I need to fetch users from Github and extract their login names. The power of RxJS lets me do all of this with just 5 lines of code.

This code introduces a couple of new artifacts, which I’ll explain. First of all, I start with an Observable wrapping Github’s users REST API. I flatMap the makeHttpCall function over that URL, which returns a promisified AJAX call. At this point, RxJS will attempt to resolve the promise and wait for its resolution. Upon completion, the response (an array containing user objects) from Github is mapped to a function that wraps the single array output back into an Observable, so that I can continue to apply further operations on that data. Lastly, I map a simple function to extract the login attribute over that data.

Map vs FlatMap

As I said before, the map function on Observables applies a function onto the data within it, and returns a new Observable containing that result. The function you call can return any type—even another Observable. In the example above, I pass in a lambda expression that takes a URL and returns an Observable made from the result of a promise:

Mapping this function would yield an observable of observables (this is a very common scenario in functional programming when working with data types called Monads). What we need is to be able to map the function and flatten or join the result back into a single Observable–like peeling back an onion. This is where flatMap comes in. The function above returns an Rx.Observable.fromPromise(...), so I need to flatten this result. As a general rule of thumb, use flatMap when you project an existing observable onto another. Let’s look at a simpler example that’s a bit easier to understand:

This code will generate a window of 5 consecutive numbers every half a second. First, it will print numbers 1-5, then 2-6, 3-7, and so on. This can be represented visually like this:

Flatmap

Section 13

Disposing of an Observable sequence

Recall earlier, I mentioned that one of the main benefits of RxJS’s abstraction over JavaScript’s event system is the ability to dispose or cancel events. This responsibility lies with the Observer, which gives you the opportunity to perform your own cleanup. This is done via the Subscription instance you obtain by subscribing to the Observable.

This code creates a simple Observable. But this time, instead of wrapping over a data source such as an event or an AJAX call, I decided to create my own custom event that emits numbers in one second intervals indefinitely. Providing my own custom event also allows me to define its disposal routine, which is done by the function returned to the subscription. RxJS maps this action to the Subscription.unsubcribe() method. In this case, my cleanup action consists of just clearing the interval function. Instead of printing numbers indefinitely, after 7 seconds, I dispose of the Observable, causing the interval to cease emitting new numbers.

Section 14

Combining Streams

While Observables might seem heavyweight, they’re actually very cheap to create and dispose of. Just like variables, they can be combined, added, ANDed together, etc. Let’s begin with merging observables.

Merging multiple streams

The merge method combines any Observable sequences into a single one. What’s impressive about this operator is that event sequences emitted over time by multiple streams are combined in the correct order. For instance, consider a simple HTML widget with three buttons to perform three actions on a counter: up, down, and clear.

Merge

Other means of combining streams can be done via the concat() and concatAll() operators.

Combining one stream with another

The withLatestFrom operator is very useful because it allows you to merge an observable sequence into another by using a selector function, only when the source observable sequence produces an element. To show this, suppose I want to print out the list of GitHub users, one every second. Intuitively, I need to combine a time-based stream with an HTTP stream.

Section 15

Buffering

As I mentioned earlier, streams are stateless data structures, which means state is never held within them but immediately flows from the producers to the consumers. However, once in a while it’s important to be able to temporarily store some data and be able to make decisions based on it. One example that comes to mind is tracking double-clicks on an element. How can you detect a second click action without storing the first one? For this, we can use buffering. There are multiple cases:

Buffering for a certain amount of time

You can temporarily hold a fixed amount of data into internal arrays that get emitted as a whole once the count threshold is met. 

Rx.Observable.range(1, 9) .bufferCount(3)
.subscribe(console.log);
//-> prints [1, 2, 3]
//			[4, 5, 6] 
//			[7, 8, 9]

Buffering data based on time

You can also buffer for a predefined period of time. To show this I’ll create a simple function that simulates sending emails every second from a set of available email addresses. If I send emails every second, and buffer for, say, 5 seconds, then buffering will emit a group of emails once the buffered time has elapsed:

Section 16

Error Handling

Up until now, you’ve learned different types of asynchronous operations on streams, whether they be on DOM events or fetching remote data. But none of the examples so far have shown you what happens when there’s an error or an exception in the stream pipeline. DOM events are very easy to work with because they won’t actually throw errors. The same can’t be said about AJAX calls. When not dealing with simple arrays, streams are actually highly unpredictable and you must be prepared for the worst.

With errors, if you are passing your own observer, you need to try catch inside and call observer.onError(error); This will allow you to catch the error , handle it, and also dispose.

Alternatively, you can use .onErrorResumeNext

Catch

The good news is that you can continue to use a good ’ol catch block (now an operator) just like you’re used to. To show this I’ll artificially create an error as soon as a stream sees the value 5.

As soon as the condition is met, the exception is thrown and propagated all the way down to the Observer subscribed to this stream. You might want to gracefully catch the exception and provide a friendly message:

The catch operator allows you to handle the error so that it doesn’t get propagated down to any observers attached to this stream. This operator expects another Observable to carry the baton forward, so you can use this to suggest some default value in case of errors. Notice that, this time, the error function on the observer never actually fired!

Now, if we do want to signal an unrecoverable condition, you can catch and throw the error. Within the catch block, this code will actually cause the exception to fire. I would caution against this as throwing exceptions is a side effect that will ripple through and is expected to be handled somewhere else. This should be used sparingly in truly critical conditions.

Another option is to attempt a retry.

Retries

With observables, you can retry the previous operation for a certain number of times, before the failure is fired.

##.finally() As JavaScript developers, our code deals with many events or asynchronous computations all the time. This can get complicated quickly as we build comprehensive UIs or state-machines that need to react and keep responsive in the face of failures. RxJS truly embodies two of the most important principles of the Reactive Manifesto, which are Responsive and Resilient.

Moreover, RxJS makes these computations first-class citizens of the language and offers a state-of-the-art event system for JavaScript. This provides a unified computing model that allows for readable and composable APIs to deal with these asynchronous computations, abstracting out the nitty gritty details of latency and wait time.

Section 18

Recommended Book

Functional Programming in JavaScript teaches JavaScript developers functional techniques that will improve extensibility, modularity, reusability, testability, and performance. Through concrete examples and jargon-free explanations, this book teaches you how to apply functional programming to real-life JavaScript development tasks. The book includes insightful comparisons to object-oriented or imperative programming, which will allow you to ease into functional design. Moreover, you’ll learn a repertoire of techniques and design patterns including function chaining and pipelining, recursion, currying, binding, functional composition, lazy evaluation, fluent error handling, memoization, and much more. By the end of the book, you’ll think about application design in a fresh new way.

Visit this link to get your copy now.

Publications

  • Featured
  • Latest
  • Popular
DOWNLOAD
Design Patterns
Learn design patterns quickly with Jason McDonald's outstanding tutorial on the original 23 Gang of Four design patterns, including class diagrams, explanations, usage info, and real world examples.
198.9k 535.8k
DOWNLOAD
Core Java
Gives you an overview of key aspects of the Java language and references on the core library, commonly used tools, and new Java 8 features.
122.2k 321.6k
DOWNLOAD
Getting Started with Ajax
Introduces Ajax, a group interrelated techniques used in client-side web development for creating asynchronous web applications.
100.4k 196.6k
DOWNLOAD
Getting Started with Git
This updated Refcard explains why so many developers are migrating to this exciting platform. Learn about creating a new Git repository, cloning existing projects, the remote workflow, and more to pave the way for limitless content version control.
108.7k 241.8k
DOWNLOAD
Spring Configuration
Catalogs the XML elements available as of Spring 2.5 and highlights those most commonly used: a handy resource for Spring context configuration.
101.7k 254k
DOWNLOAD
Core CSS: Part I
Covers Core principles of CSS that will expand and strengthen your professional ability to work with CSS. Part one of three.
88.7k 191.5k
DOWNLOAD
jQuery Selectors
Introduces jQuery Selectors, which allow you to select and manipulate HTML elements as a group or as a single element in jQuery.
92k 347.8k
DOWNLOAD
Foundations of RESTful Architecture
Introduces the REST architectural style, a worldview that can elicit desirable properties from the systems we deploy.
90.6k 132.9k
DOWNLOAD
The Ultimate Scrum Reference Card
Provides a concise overview of roles, meetings, rules, and artifacts within a Scrum organization. Updated for 2016.
84.5k 222.5k
DOWNLOAD
Core Java Concurrency
Helps Java developers working with multi-threaded programs understand the core concurrency concepts and how to apply them.
88.1k 179.3k
DOWNLOAD
Core CSS: Part II
Covers Core principles of CSS that will expand and strengthen your professional ability to work with CSS. Part two of three.
72.3k 137.8k
DOWNLOAD
Getting Started with Eclipse
Gives insights on Eclipse, the leading IDE for Java, which has a rich ecosystem of plug-ins and an open-source framework that supports other languages.
72k 182.6k
{{ card.title }}
{{card.downloads | formatCount }} {{card.views | formatCount }}

The best of DZone straight to your inbox.

SEE AN EXAMPLE
Please provide a valid email address.

Thanks for subscribing!

Awesome! Check your inbox to verify your email so you can start receiving the latest in tech news and resources.
Subscribe

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}