DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

How does AI transform chaos engineering from an experiment into a critical capability? Learn how to effectively operationalize the chaos.

Data quality isn't just a technical issue: It impacts an organization's compliance, operational efficiency, and customer satisfaction.

Are you a front-end or full-stack developer frustrated by front-end distractions? Learn to move forward with tooling and clear boundaries.

Developer Experience: Demand to support engineering teams has risen, and there is a shift from traditional DevOps to workflow improvements.

  1. DZone
  2. Refcards
  3. Reactive Programming in JavaScript With RxJS
refcard cover
Refcard #227

Reactive Programming in JavaScript With RxJS

Using Streams for Event-Driven Web Applications

Web application code quickly becomes tangled, hard to maintain, and hard to test. The problem is that asynchronous computations are inherently difficult to manage. RxJS solves async without the callback pyramid of doom.

Free PDF for Easy Reference
refcard cover

Written By

author avatar Luis Atencio
Staff Software Engineer, Citrix Systems
Table of Contents
► Intro ► The problem RxJS solves ► Hello RxJS ► Understanding Streams ► The Observable ► Creating Observables ► The Observer ► The Subscription ► Sequencing with streams ► Dealing with time ► Handling user input ► Handling asynchronous calls ► Disposing of an Observable sequence ► Combining Streams ► Buffering ► Error Handling ► Additional Resources ► Recommended Book
Section 1

Intro

As experienced JavaScript developers, we’re used to dealing with a certain level of asynchronous computations in our code. We’re constantly asked to process user input, fetch data from remote locations, or run long-running computations simultaneously, all without halting the browser. Arguably, these are not trivial tasks and certainly require that we learn to step away from the synchronous computation paradigm, and step into a model where time and latency become key issues. For simple applications, using JavaScript’s main event system directly or even wrapped with the help of jQuery libraries is common practice. However, without the proper paradigm in place, scaling the simple code that solves these asynchronous problems to a richer and feature-complete app that meets the needs of the modern day web user, is still difficult. What we find is that our application code becomes tangled, hard to maintain, and hard to test. The problem is that asynchronous computations are inherently difficult to manage, and RxJS solves this problem.

Section 2

The problem RxJS solves

One of the most important goals of any application is to remain responsive at all times. This means that it’s unacceptable for an application to halt while processing user input or fetching some additional data from the server via AJAX. Generally speaking, the main issue is that IO operations (reading from disk or from the network) are much slower than executing instructions on the CPU. This applies both to the client and the server. Let’s focus on the client. In JavaScript, the solution has always been to take advantage of the browser’s multiple connections and use callbacks to spawn a separate process that takes care of some long-running task. Once the task terminates, the JavaScript runtime will invoke the callback with the data. This a form of inversion of control, as the control of the program is not driven by you (because you can’t predict when a certain process will complete), but rather under the responsibility of the runtime to give it back to you. While very useful for small applications, the use of callbacks gets messy with richer applications that need to handle an influx of data coming from the user as well as remote HTTP calls. We’ve all been through this: as soon as you need multiple pieces of data you begin to fall into the popular “pyramid of doom” or callback hell.

makeHttpCall('/items', 
   items => {
      for (itemId of items) {
         makeHttpCall(`/items/${itemId}/info`,
           itemInfo => {        
              makeHttpCall(`/items/${itemInfo.pic}`,
                img => {
                    showImg(img);
              });   
           });
      }
});
beginUiRendering();

This code has multiple issues. Once of them is style. As you pile more logic into these nested callback functions, this code becomes more complex and harder to reason about. A more subtle issue is created by the for loop. A for loop is a synchronous control flow statement that doecesn’t work well with asynchronous calls that have latency, which could lead to very strange bugs.

Historically, this has been a very big problem for JavaScript developers, so the language introduced Promises in ES6. Promises help shoulder some of these issues by providing a nice fluent interface that captures the notion of time and exposes a continuity method called then(). The same code above becomes:

makeHttpCall('/items')
    .then(itemId => makeHttpCall(`/items/${itemId}/info`))
    .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`))
    .then(showImg);

Certainly this is a step in the right direction. The sheer mental load of reading this code has reduced dramatically. But Promises have some limitations, as they are really efficient for working with single value (or single error) events. What about handling user input where there’s a constant flow of data? Promises are also insufficient to handle events because they lack semantics for event cancellation, disposal, retries, etc. Enter RxJS.

Section 3

Hello RxJS

RxJS is a library that directly targets problems of an asynchronous nature. Originating from the Reactive Extensions project, it brings the best concepts from the Observer pattern and functional programming together. The Observer pattern is used to provide a proven design based on Producers (the creators of event) and Consumers (the observers listening for said events), which offers a nice separation of concerns.

Moreover, functional programming concepts such as declarative programming, immutable data structures, and fluent method chaining—to name a few— enable you to write very expressive and easy to reason about code (bye-bye, callbacks).

For an overview of functional programming concepts, please visit the Functional Programming in JavaScript RefCard.

If you’re familiar with functional programming in JavaScript, you can think of RxJS as the “Underscore.js of asynchronous programming.”

RxJS introduces an overarching data type called the stream.

Section 4

Understanding Streams

Streams are nothing more than a sequence of events over time. Streams can be used to process any of type of event such as mouse clicks, key presses, bits of network data, etc. You can think of streams as variables that with the ability to react to changes emitted from the data they point to.

Variables and streams are both dynamic, but behave a bit differently; in order to understand this, let’s look at a simple example. Consider the following simple arithmetic:

var a = 2;
var b = 4;
var c = a + b;
console.log(c); //-> 6

a = 10;  // reassign a
console.log(c); //-> still 6

Even though variable a changed to 10, the values of the other dependent variables remain the same and do not propagate through–by design. This is where the main difference is. A change in an event always gets propagated from the source of the event (producers) down to any parts that are listening (consumers). Hypothetically speaking, if these variables were to behave like streams, the following would occur:

var A$ = 2;
var B$ = 4;
var C$ = A$ + B$;
console.log(C$); //-> 6

A$ = 10;  
console.log(C$); //->  16

As you can see, streams allow you to specify the dynamic behavior of a value declaratively (as a convention, I like to use the $ symbol in front of stream variables). In other words, C$ is a specification that concatenates (or adds) the values of streams A$ and B$. As soon as a new value is pushed onto A$, C$ immediately reacts to the change printing the new value 16. Now, this is a very contrived example and far from actual syntax, but it serves to explain programming with event streams versus variables.

Now let’s begin learning some RxJS.

Section 5

The Observable

Perhaps the most important part of the RxJS library is the declaration of the Observable type. Observables are used to wrap a piece of data (button clicks, key presses, mouse movement, numbers, strings, or arrays) and decorate it with stream-like qualities. The simplest observable you can create is one with a single value, for instance:

var streamA$ = Rx.Observable.of(2);

Let’s revisit the example above, this time using real RxJS syntax. I’ll show you some new APIs that I’ll talk about more in a bit:

const streamA$ = Rx.Observable.of(2);
const streamB$ = Rx.Observable.of(4);
const streamC$ = Rx.Observable.concat(streamA$, streamB$)
  .reduce((x, y) => x  + y);

streamC$.subscribe(console.log); //prints 6

Running this example prints the value 6. Unlike the pseudo code above, I can’t really reassign the value of a stream after its been declared because that would just create a new stream–it’s an immutable data type. As a convention, since it’s all immutable I can safely use the ES6 const keyword to make my code even more robust.

In order to push new values, you will need to modify the declaration of streamA$:

const streamA$ = Rx.Observable.of(2, 10)
...
streamC$.subscribe(console.log); //prints 16

Now, subscribing to streamC$ would generate 16. Like I mentioned before, streams are just sequences of events distributed over time. This is often visualized using a marble diagram:

Simple stream

Section 6

Creating Observables

Observables can be created from a variety of different sources. Here are the more common ones to use:

Source Observable (static) method
of(arg) Converts arguments to an observable sequence
from(iterable) Converts arguments to an observable sequence
fromPromise(promise) Converts a Promises/A+ spec-compliant Promise and/or ES2015-compliant Promise to an observable sequence
fromEvent(element, eventName) Creates an observable sequence by adding an event listener to the matching DOMElement, jQuery element, Zepto Element, Angular element, Ember.js element, or EventEmitter

The other noticeable difference when programming with streams is the subscription step. Observables are lazy data types, which means that nothing will actually run (no events emitted, for that matter) until a subscriber streamC$.subscribe(...) is attached. This subscription mechanism is handled by Observer.

Section 7

The Observer

Observers represent the consumer side of the model and are in charge of reacting to values produced or emitted by the corresponding Observables. The Observer is a very simple API based on the Iterator pattern that defines a next() function. This function is called on every event that’s pushed onto an observable. Behind the scenes, the shorthand subscription above streamC$.subscribe(console.log) creates an Observer object behind the scenes that looks like this:

const observer = Rx.Observer.create(
    function next(val) {  
        console.log(val);
    },
    function error(err) { 
        ; // fired in case an exception occurs
    },
    function complete() {
        ; // fired when all events have been processed
    }
);

Observers also specify an API to handle any errors, as well as a means to signal that all the events have been processed. All of the Observer methods are optional, you can subscribe to an observable with just .subscribe(), but the most common approach is to at least provide a single function which will be mapped to next(). In this function is where you would typically perform any effectful computations like writing to a file, logging to the console, appending to the DOM, or whatever tasks you’re required you to do.

Section 8

The Subscription

Subscribing to an Observable returns a Subscription object, which you can use to unsubscribe or dispose of the stream at ant point in time. This mechanism is really nice because it fills in the gap of the native JavaScript system, where cancelling events and disposing of them correctly has always been problematic.

To show this, I’ll create an observable to listen for all click events:

Obviously, this represents an infinite stream of clicks (the completed signal will never actually fire). If I want to stop listening for events, I can simply call the unsubscribe() method on the Subscription instance returned from the Observable. This will also properly clean up and dispose of any event handlers and temporary objects created.

Now that you know how to create and destroy streams, let’s take a look at a how to use them to support any problem domain you’re tackling. I’ll stick to using numbers as my domain model to illustrate these APIs, but of course you can use them to supply the business logic of any domain.

Section 9

Sequencing with streams

At the heart of RxJS is to provide a unified programming model to handle any type of data, whether it’s synchronous (like an array) or asynchronous (remote HTTP call). RxJS uses a simple, familiar API based on the functional programming extensions added to JavaScript arrays (known as the Array#extras) with functions: map, filter, and reduce.

name Description
map(fn) Projects each element of an observable sequence into a new form
filter(predicate) Filters the elements of an observable sequence based on a predicate
reduce(accumulator, [seed]) Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value

I’ll begin with arrays. Given an array of numbers from 1 - 5, I will filter out odd numbers, compute their square, and sum them. Using traditional imperative code, this will require the use of at least a loop and a conditional statement. Using the functional Array methods I get:

Now, with very minor changes, this can work with Observables instance methods just as easily. Just like with arrays, the operators called on the Observable receive data from its preceding operator. The goal is to transform the input and apply business logic as needed within the realms of the Observable operators.

You can visualize what’s happening behind the scenes with the following diagram:

Filter, Map, Reduce

Arrays are predictable streams because they are all in memory at the time of consumption. Regardless, if these numbers were computed as the result of an HTTP call (perhaps wrapped in a Promise), the same code would still hold:

Alternatively, of course, I can create independent side-effect-free functions that can be injected into the Observable sequence. This allows your code to look even more declarative:

This is the beauty of RxJS: a single programming model can support all cases. Additionally, observables allow you to sequence and chain operators together very fluently while abstracting the problem of latency and wait-time away from your code. Notice also that this way of writing code also eliminates the complexity involved in looping and conditional statements in favor of higher-order functions.

Section 10

Dealing with time

Understanding how RxJS effectively deals with the time and latency can be done via its time-based operators. Here’s a short list of the most common ones used to create Observables with time in them:

name Description
Rx.Observable.interval(period) Returns an observable sequence that produces a value after each period
Rx.Observable.timer(dueTime) Returns an observable sequence that produces a value after dueTime has elapsed and then after each period

These are very nice for simulating timed events:

Interval

Using interval(500) will emit values every half a second. Because this is an infinite stream, I’m taking only the first 5, converting the Observable into a finite stream that will actually send a completed signal.

Section 11

Handling user input

You can also use Observables to interact with DOM events. Using Rx.Observable.fromEvent I can listen for any DOM events. Here’s a quick example:

I can handle clicks in this case and perform any action within the observer. The map operator is used here to transform the incoming click event, extract the underlying element, and read its href attribute.

Section 12

Handling asynchronous calls

Handling user input is not the only type of asynchronous actions you can work with. RxJS also nicely integrates with the ES6 Promise API to fetch remote data. Suppose I need to fetch users from Github and extract their login names. The power of RxJS lets me do all of this with just 5 lines of code.

This code introduces a couple of new artifacts, which I’ll explain. First of all, I start with an Observable wrapping Github’s users REST API. I flatMap the makeHttpCall function over that URL, which returns a promisified AJAX call. At this point, RxJS will attempt to resolve the promise and wait for its resolution. Upon completion, the response (an array containing user objects) from Github is mapped to a function that wraps the single array output back into an Observable, so that I can continue to apply further operations on that data. Lastly, I map a simple function to extract the login attribute over that data.

Map vs FlatMap

As I said before, the map function on Observables applies a function onto the data within it, and returns a new Observable containing that result. The function you call can return any type—even another Observable. In the example above, I pass in a lambda expression that takes a URL and returns an Observable made from the result of a promise:

Mapping this function would yield an observable of observables (this is a very common scenario in functional programming when working with data types called Monads). What we need is to be able to map the function and flatten or join the result back into a single Observable–like peeling back an onion. This is where flatMap comes in. The function above returns an Rx.Observable.fromPromise(...), so I need to flatten this result. As a general rule of thumb, use flatMap when you project an existing observable onto another. Let’s look at a simpler example that’s a bit easier to understand:

This code will generate a window of 5 consecutive numbers every half a second. First, it will print numbers 1-5, then 2-6, 3-7, and so on. This can be represented visually like this:

Flatmap

Section 13

Disposing of an Observable sequence

Recall earlier, I mentioned that one of the main benefits of RxJS’s abstraction over JavaScript’s event system is the ability to dispose or cancel events. This responsibility lies with the Observer, which gives you the opportunity to perform your own cleanup. This is done via the Subscription instance you obtain by subscribing to the Observable.

This code creates a simple Observable. But this time, instead of wrapping over a data source such as an event or an AJAX call, I decided to create my own custom event that emits numbers in one second intervals indefinitely. Providing my own custom event also allows me to define its disposal routine, which is done by the function returned to the subscription. RxJS maps this action to the Subscription.unsubcribe() method. In this case, my cleanup action consists of just clearing the interval function. Instead of printing numbers indefinitely, after 7 seconds, I dispose of the Observable, causing the interval to cease emitting new numbers.

Section 14

Combining Streams

While Observables might seem heavyweight, they’re actually very cheap to create and dispose of. Just like variables, they can be combined, added, ANDed together, etc. Let’s begin with merging observables.

Merging multiple streams

The merge method combines any Observable sequences into a single one. What’s impressive about this operator is that event sequences emitted over time by multiple streams are combined in the correct order. For instance, consider a simple HTML widget with three buttons to perform three actions on a counter: up, down, and clear.

Merge

Other means of combining streams can be done via the concat() and concatAll() operators.

Combining one stream with another

The withLatestFrom operator is very useful because it allows you to merge an observable sequence into another by using a selector function, only when the source observable sequence produces an element. To show this, suppose I want to print out the list of GitHub users, one every second. Intuitively, I need to combine a time-based stream with an HTTP stream.

Section 15

Buffering

As I mentioned earlier, streams are stateless data structures, which means state is never held within them but immediately flows from the producers to the consumers. However, once in a while it’s important to be able to temporarily store some data and be able to make decisions based on it. One example that comes to mind is tracking double-clicks on an element. How can you detect a second click action without storing the first one? For this, we can use buffering. There are multiple cases:

Buffering for a certain amount of time

You can temporarily hold a fixed amount of data into internal arrays that get emitted as a whole once the count threshold is met. 

Rx.Observable.range(1, 9) .bufferCount(3)
.subscribe(console.log);
//-> prints [1, 2, 3]
//[4, 5, 6] 
//[7, 8, 9]

Buffering data based on time

You can also buffer for a predefined period of time. To show this I’ll create a simple function that simulates sending emails every second from a set of available email addresses. If I send emails every second, and buffer for, say, 5 seconds, then buffering will emit a group of emails once the buffered time has elapsed:

Section 16

Error Handling

Up until now, you’ve learned different types of asynchronous operations on streams, whether they be on DOM events or fetching remote data. But none of the examples so far have shown you what happens when there’s an error or an exception in the stream pipeline. DOM events are very easy to work with because they won’t actually throw errors. The same can’t be said about AJAX calls. When not dealing with simple arrays, streams are actually highly unpredictable and you must be prepared for the worst.

With errors, if you are passing your own observer, you need to try catch inside and call observer.onError(error); This will allow you to catch the error , handle it, and also dispose.

Alternatively, you can use .onErrorResumeNext

Catch

The good news is that you can continue to use a good ’ol catch block (now an operator) just like you’re used to. To show this I’ll artificially create an error as soon as a stream sees the value 5.

As soon as the condition is met, the exception is thrown and propagated all the way down to the Observer subscribed to this stream. You might want to gracefully catch the exception and provide a friendly message:

The catch operator allows you to handle the error so that it doesn’t get propagated down to any observers attached to this stream. This operator expects another Observable to carry the baton forward, so you can use this to suggest some default value in case of errors. Notice that, this time, the error function on the observer never actually fired!

Now, if we do want to signal an unrecoverable condition, you can catch and throw the error. Within the catch block, this code will actually cause the exception to fire. I would caution against this as throwing exceptions is a side effect that will ripple through and is expected to be handled somewhere else. This should be used sparingly in truly critical conditions.

Another option is to attempt a retry.

Retries

With observables, you can retry the previous operation for a certain number of times, before the failure is fired.

##.finally() As JavaScript developers, our code deals with many events or asynchronous computations all the time. This can get complicated quickly as we build comprehensive UIs or state-machines that need to react and keep responsive in the face of failures. RxJS truly embodies two of the most important principles of the Reactive Manifesto, which are Responsive and Resilient.

Moreover, RxJS makes these computations first-class citizens of the language and offers a state-of-the-art event system for JavaScript. This provides a unified computing model that allows for readable and composable APIs to deal with these asynchronous computations, abstracting out the nitty gritty details of latency and wait time.

Section 17

Additional Resources

  • http://rxmarbles.com/
  • http://reactivex.io/
  • http://callbackhell.com/
  • http://xgrommx.github.io/rx-book/
  • https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
  • https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
  • https://github.com/Reactive-Extensions/RxJS/tree/master/doc/designguidelines
Section 18

Recommended Book

Functional Programming in JavaScript teaches JavaScript developers functional techniques that will improve extensibility, modularity, reusability, testability, and performance. Through concrete examples and jargon-free explanations, this book teaches you how to apply functional programming to real-life JavaScript development tasks. The book includes insightful comparisons to object-oriented or imperative programming, which will allow you to ease into functional design. Moreover, you’ll learn a repertoire of techniques and design patterns including function chaining and pipelining, recursion, currying, binding, functional composition, lazy evaluation, fluent error handling, memoization, and much more. By the end of the book, you’ll think about application design in a fresh new way.

Visit this link to get your copy now.

Like This Refcard? Read More From DZone

related article thumbnail

DZone Article

Creating an Observable in RxJava
related article thumbnail

DZone Article

Cybersecurity Innovations in Software Development: How Developers Are Tackling Security Threats
related article thumbnail

DZone Article

The Future Is Now: Top Generative AI Services You Can’t Ignore
related article thumbnail

DZone Article

Modern IT Incident Management: Tools, Trends, and Faster Recovery
related refcard thumbnail

Free DZone Refcard

Getting Started With Low-Code Development
related refcard thumbnail

Free DZone Refcard

JavaScript Test Automation Frameworks
related refcard thumbnail

Free DZone Refcard

Salesforce Application Design
related refcard thumbnail

Free DZone Refcard

E-Commerce Development Essentials

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • [email protected]

Let's be friends: