RxJS First Steps — Subject and Replaysubject

We take a look at a very specific part of RxJS 5, namely Subject and ReplaySubject by implementing a simple publish/subscriber mechanism.

I'm not sure whether you've already heard about RxJS or Reactive Programming in general, but there's currently a very strong movement toward such a programming style in the modern JavaScript world, so you should definitely check it out. Here we're going to take a look at a very specific part of RxJS 5, namely Subject and ReplaySubject by implementing a simple publish/subscriber mechanism. (TL;DR: check out the screencast at the end)

So a couple of days ago I read this Tweet:

Image title

I totally agree, RxJS has a "learning cliff!"

Image title

But, I think Cecil’s answer is an awesome way to approach this initial complexity. Take what you need, learn about it, use it, and move on. Don’t worry about understanding everything at once (it’s overwhelming, I promise!). Coming from the .NET world, I remember when I first got in touch with Linq, a querying language built into .NET that allows you to write really powerful queries over collections, even over database objects (via appropriate adapters). It seemed so odd and complex, but once you grasp it, it’s just mind-blowingly powerful. I have the feeling the same holds for RxJS.

What I Needed: A Broadcasting Mechanism

So, I had an Angular 1.x application and needed a way for other (potentially lazy loaded) components to get notified about certain events happening within the application. Like, whenever the user executes a search through the application’s search component, I want such modules to allow the user to subscribe to a “search event” and then get invoked with the search results.

Angular 1? Why don’t you simply use the $rootScope.$emit(...) as broadcasting mechanism? Sure thing, that would totally work. But under the assumption that I’ll upgrade sooner or later to Angular 2, I’m trying to avoid the $scope as much as possible. Also, it has other side effects as well.

RxJS is made for this, right? There’s a stream of data (my broadcast values), and there are so-called Observables to which you can subscribe and get updated about new values. I always wanted to experiment around with RxJS on a concrete example, so it was time.

First Things First. What Is RxJS? What Am I Talking About?

RxJS is the JavaScript implementation of ReactiveX.

ReactiveX is more than an API, it's an idea and a breakthrough in programming. It has inspired several other APIs, frameworks, and even programming languages. reactivex.io

It implements concepts from the popular observer pattern, iterator pattern, and functional programming. Usually, using a reactive extension library consists of creating some kind of event stream, then combining/transforming those streams with query-like operators, and finally listening by subscribing to those resulting streams for performing operations. Browse the official site for more details.

It was initially popularized by Microsoft and published under the Reactive-Extensions GitHub repository, containing various language-specific implementations. In fact, when you Google for RxJS (the JavaScript implementation of reactive extensions), you most likely land on the GitHub repo of RxJS 4. Recently, this library has been rewritten from the ground up with performance in mind—the result is RxJS 5 (beta). The core contributor here is Ben Lesh, senior software engineer at Netflix, which is a huge consumer of Rx (and stands for everything performance related).

Image title

With Angular 2, Google also jumped onto reactive extensions. Angular 2 makes heavy use of RxJS 5, for instance, the provided HTTP library returns Rx Observables by default, rather than Promises as you might expect.

Also, Twitter is an awesome source of information (at least for me). So if you want to get more on Reactive programming, I recommend you to go and follow these guys:

  • Ben Lesh - Software Engineer at Netflix in charge of RxJS 5
  • Andre Staltz - A reactive programming and functional programming expert which has written THE introduction to reactive programming (see link at end of this article)
  • Michel Weststrate - Creator of MobX, a state management library that embraces reactive programming like no other lib
  • Rob Wormald - Developer advocate at Google on the Angular team and big proponent of reactive programming. He also created ngrx, a project that started as a Redux inspired library built with RxJS and evolved to a collection of reactive extensions for Angular 2.
  • Victor Savkin - Developer advocate at Google on the Angular team. On his blog, he writes interesting articles, also on reactive programming, in particular, this one.
  • And probably many others. Let me know and I’ll list them here.

Subject and ReplaySubject

Rx.Subject: “Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.”

That sounds good for our broadcasting mechanism, right? We’re not so much interested in the “observer” part of the Subject but in the fact it is an “observable sequence”. Great, so let’s create a new instance of it.

var broadcast = new Rx.Subject();

From here you can start emitting new values using the next(..) function, like

broadcast.next('Hi there');

Now obviously someone needs to “listen” to these broadcasts and do something with these values. That’s what we call “subscribe.”

var someSubscriber = broadcast
      .subscribe(function(value) {
        console.log('Got value: ' + value);

Image title

That was easy, right? Obviously, we can broadcast any kind of value we need. Great! But, now we’re going to make things a bit more interesting and realistic. Our app subscribers may come and go at different times. So, I created a simple UI that allows us to simulate such behavior. Something like this:

We don’t have to change much in our code—just some HTML—and we move the value emitting inside a button click:
addClickListener('broadcastValue', function() {
  broadcast.next('Broadcasting..' + Math.round((Math.random() * 100)));

Also, the subscribers are registered when the corresponding button is clicked:

addClickListener('subs1', function() {
  broadcast.subscribe(function(value) {
    print('Subs1 got ' + value);

By the way, addClickListener is just a helper function I created.

Image title

Note, if you just click the “Broadcast value” button without registering a subscriber, nothing happens. Once you start clicking a subscriber or two, they’ll start receiving the values and start printing them out.

Can We Also Unsubscribe?

Of course. subscribe() returns a reference on which we can invoke unsubscribe().

var subscriber = broadcast.subscribe(...);
// unsubscribe again

Image title


What if I Want to Subscribe Only to Specific Events

In a publish/subscribe environment, my modules in the application might only be interested in certain kind of events and not all of them. Obviously, we could take them out in the subscribe(...). But there’s a more powerful mechanism built into RxJS: lots of operators! We’re interested in the filter operation here.

Like, we want subscriber 1 to only get values < 50.

sub1Subscription =
          return value < 50;
        .subscribe(function(value) {
          print('Subs1 got ' + value);

If you’re using ES6, it looks even cleaner:

sub1Subscription =
        .filter(x => x < 50)
        .subscribe(x => print('Subs1 got ' + value));

Here we go:

Image title

Wait, You Also Mentioned ReplaySubject... What's the Difference?

“ReplaySubject emits to any observer all of the items that were emitted by the source Observable(s), regardless of when the observer subscribes.” ReactiveX Docs

Interesting, let’s try that out. We do nothing else other than changing the Subject with ReplaySubject:

var broadcast = new Rx.ReplaySubject();

Now, in the example below, click the broadcast button a couple of times and then click on a subscriber button to register it. Note that it’ll immediately start writing out values which have been published previously.

Image title

Besides Subject which is the most basic one and ReplaySubject, there exist also others like AsyncSubject and BehaviorSubject. Simply Google for examples on those.



We obviously only scratched the surface here. RxJS is extremely powerful, especially when combined with asynchronous data “flowing in from your APIs.” It’s hard to get started initially, but I highly recommend you to play around with it, use my JSBins above, clone them, and experiment.

Also, I’ve not yet tried it, but the above-described approach could be a very valid alternative for replacing Angular 1.x’s $rootScope.$emit and $rootScope.$broadcast for broadcasting. That would further help to avoid using the $scope and prepare for a migration to Angular 2. That's an idea for another post.

If you enjoyed this post you might want to follow me on Twitter for more news around JavaScript and Angular 2.

Published at DZone with permission of Juri Strumpflohner, DZone MVB.

Opinions expressed by DZone contributors are their own.


