All Articles

The nature of Observables

ReactiveX scheme

Photo by Kazuky Akayashi on Unsplash.

Recently, when I was learning Angular, I have discovered a library that is shipped with it, called RxJS. This particular library is just one of implementations of a more general set of programming tools - ReactiveX. These libraries provide a handful of classes and a lot of functions that aim to provide the reactive paradigm in most popular programming languages. But what is even greater, ReactiveX promotes the functional paradigm as well.

This library implements a few simple patterns, but I am still astonished by how powerful these patterns are when combined together. Despite its abstraction and simplicity, it can solve many common problems (e.g. with asynchronous programming) in a few lines of code. Let’s see how it works!


The code below will be written in TypeScript. If you would like to run it, install TypeScript and RxJS:

npm install -g typescript
npm install rxjs

Then, you can compile and run it, for example with Node.js:

npx tsc example.ts && node example.js

The core - Observable

Before I cover the subject theoretically, let’s see an example:

import { Observable } from 'rxjs';

const myObservable = new Observable(subscriber => {
  subscriber.next("first value");
  subscriber.next("second value");

  console.log("Some side-effect");

  setTimeout(() => {
    subscriber.next("last value");
    subscriber.complete();
    console.log('Observable has completed');

    subscriber.next("this will not be sent");
  }, 2000);
});

myObservable.subscribe({
  next: value => console.log('value:', value),
  error: err => console.log('error:', err),
  complete: () => console.log("This is the end")
});

Before we analyze it, let’s just notice some keywords: Observable, subscriber, subscribe. These may sound familiar, especially when you have ever heard about a well-known Observer pattern in object-oriented programming. It is a similar pattern actually, but there are several differences.

What an observable is?

It is a more general concept than a function. A function basically can return only one value. Even if we are smart and use some array of values, or an object, it is still one reference returned. Observables can do better, they can return multiple values. They can be similar to iterators.

Another important feature of a classic function is that it returns immediately. We cannot return a value after 2 seconds of its invocation without any hacks like async/await keywords, Promises, etc. Observables can, so they can be seen as a kind of event emitters as well.

There is another difference. Observables basically do not take any arguments, there is no such concept in Observables. However, it does not mean that we cannot create a factory of Observables, which would take parameters and return customized Observables. So it is simple to make a workaround.

More general Promise?

If you know Promises from JavaScript, you have probably noticed that an Observable is just like a Promise, but with multiple results. It can be considered in such a way as well.

Usage analysis

Now, when we are forearmed with knowledge, let’s recap what was going on in the code above.

const myObservable = new Observable(subscriber => {
  //...
});

An Observable in RxJS is an object, which needs a function when it is created. It is called a subscribe function and it is the heart of the Observable. This function will be called every time the Observable is subscribed. It is supposed to take one argument, usually called subscriber or observer. Here, the subscriber is passed to the function and the function can perform three actions on it:

subscriber.next("next value");
subscriber.error("error message");
subscriber.complete();

The first one, next is a way of returning values. The second one, error, is used when something goes wrong and tells the observer about it. The last one, complete, tells that no more values will be returned.

The most important lines are these ones:

myObservable.subscribe({
  next: value => console.log('value:', value),
  error: err => console.log('error:', err),
  complete: () => console.log("This is the end")
});

Observables have an important method called subscribe. It takes handlers that are invoked when the subscribe function calls the corresponding methods on the subscriber from its parameter.

You can try to run the example program without them and you will notice that nothing is going on. This shows us another important feature of Observables: they are lazy, not evaluated until subscribed. This laziness is similar in functions, but different from Promises, as we can create a function without invoking it, but Promises in JS are always called immediately after creation.

The contract of Observables

In our example, you can notice that we cheat a little bit.

subscriber.complete();
console.log('Observable has completed');

subscriber.next("this will not be sent");

We use next after complete, but complete means that we cannot return any value anymore. How do Observables deal with such situations?

If you run the code, you will see this result:

value: first value
value: second value
Some side-effect
value: last value
This is the end
Observable has completed

The value of "this will not be sent" never arrives at the observer. How does it work?

One might assume that the object we passed to the subscribe method is passed directly to the subscribe function from Observable’s constructor, but that is not true. Actually, the subscribe methods create a wrapper around our handlers so that the observer can be sure that such a weird situation never happens. This assurance is sometimes called as the contract of Observables_.

It means that the observer can know that values can be sent until one of two situations happens:

  • Observable completes, which is a signal that no more values will be sent to the observer anymore, and its execution has succeeded,
  • Observable signalizes error, which means that something went wrong and no more values will be sent to the observer anymore, and the Observable will not complete.

Please notice that only one of these scenarios is possible, not both of them. Moreover, none of them is also possible, which means that the observer can wait for new values forever.

Observable diagram

Unsubscribe

As you could notice in the diagram above, there is another possibility of finishing an Observable’s execution. Complete and error are invoked by the Observable, but the observer can also do something, it can unsubscribe. Let’s see an example.

import { Observable } from 'rxjs';

const myInterval = new Observable(subscriber => {
  let i = 1;

  const interval = setInterval(() => {
    console.log("In interval");
    subscriber.next(i++);
  }, 1000);

  return () => {
    console.log("Unsubscribed")
    clearInterval(interval);
  }
});

const subscription = myInterval.subscribe({
  next: value => console.log('value:', value),
  error: err => console.log('error:', err),
  complete: () => console.log("This is the end")
});

setTimeout(() => {
  subscription.unsubscribe()
}, 5500);

This code will print these lines, with a second of delay between pairs of lines, then after 5.5 seconds, it will finish.

In interval
after 1 seconds
In interval
after 2 seconds
In interval
after 3 seconds
In interval
after 4 seconds
In interval
after 5 seconds
Unsubscribed

This Observable is a little different from the previous one, where the main difference is that the subscribe function returns another function. This returned function is called unsubscribe function and it is supposed to dispose of all the resources that the Observable has claimed, and clean after it. In our example, we want to clear the interval, as without disposing of it, it would run forever, but we want to stop it when the observer is not interested in new values anymore.

The unsubscribe function is called internally when the Observable completes or signalizes error, so the observer does not have to care about disposing of resources in such cases. It means that in the diagram above, it is called on the End stage.

This function is wrapped by the subscribe method and returned as a Subscription object, which has the unsubscribe method. We can use it at any time we want and the Observable will stop execution for this observer.

Operators

The interval Observable that we wrote above seems like a common use case, so it has been already implemented in the RxJS library. We can use it like this:

import { interval } from 'rxjs';

const subscription = interval(1000).subscribe({
  next: value => console.log(`interval ${value}`),
  error: err => console.log('error:', err),
  complete: () => console.log("This is the end")
});

setTimeout(() => subscription.unsubscribe(), 5500);

It is enough to use a function called interval and pass the desired time interval in milliseconds. This is an example of a creational operator - a function that returns an Observable.

There are many other creational operators, most notable ones include:

  • of - creates an Observable with values of arguments
  • from - creates an Observable from an array, iterator, Promise, etc.
  • ajax - creates an Observable from an HTTP request, with its response as a value
  • empty - returns an Observable which completes immediately without any values, useful when combining with other operators
  • fromEvent - creates an Observable that listens for particular event on HTML element.

Pipeable operators

There is also another type of operators, called pipeable. They can transform one Observable to another. Here come the power and elegance of functional programming. Let’s see a simple example.

import { range } from 'rxjs';
import { map, filter, switchMap } from 'rxjs/operators';

const foo = range(1, 10).pipe(
  map(x => x * x),
  tap(x => console.log("-->", x))
  filter(x => x % 3 == 0),
  switchMap(x => range(x, 3))
);

foo.subscribe(value => console.log(value));

This example prints:

--> 1
--> 4
--> 9
9
10
11
--> 16
--> 25
--> 36
36
37
38
--> 49
--> 64
--> 81
81
82
83
--> 100

Let’s see what is going on here.

The range creational operator creates an Observable which returns values from 1 to 10. Then we use the method pipe. Its job is to take pipeable operators, transform the Observable, and return a new one, with all the logic from operators applied.

What is important, the Observable at the input is not touched, remains exactly the same. Observables like immutability. Such a feature is very helpful when we want to use an Observable in two places, in the first one directly, in the second after some transformations.

What is exactly an operator?

It is a function, which returns another function. The returned function is supposed to take an Observable as a parameter and return a new Observable based on the input. Easy, isn’t it? So let’s go through the code further!

  map(x => x * x),

The map operator is similar to the well-known map method of arrays. It returns an Observable which just passes all the values from the source, but transformed in some way. Here, we create an Observable which returns squares of numbers from the source Observable.

Marble diagrams

It is the right time to visualize it. Fortunately, in the world of Observables, someone has already invented a friendly way of presenting how the operators work. Let me show you a marble diagram.

A marble diagram

An introduction to marbles from docs of RxJS.

It is a convention used in the documentation and many other references about ReactiveX. Moreover, even tests for the library are written using marbles! Let’s see such a code!

it('should allow unsubscribing early and explicitly', () => {
    const e1 =   hot('--a--b--b--d--a--f--|');
    const e1subs =   '^         !          ';
    const expected = '--a--b-----          ';
    const unsub =    '          !          ';

    const result = (<any>e1).pipe(distinct());

    expectObservable(result, unsub).toBe(expected);
    expectSubscriptions(e1.subscriptions).toBe(e1subs);
  });

It is one of the tests of the distinct operator. The test does not matter now, but you can see that the marble diagrams are everywhere in the Observables world.

Go back to our code

So the map operator transforms values of the source Observable and returns a new one. This is its diagram:

--1---2---3-----|
 map(x => x * x)
--1---4---9-----|

Pretty easy. The next operator in our code is:

  tap(x => console.log("-->", x))

Tap

Pipe has ends, but it can also have a tap.

The tap operator allows as to peek a value and do something with it, do some side-effect. It does nothing to the source Observable, so its diagram is fairly easy:

--1---4---9-----|
  tap(x => console.log("-->", x))
--1---4---9-----|

I have written the side-effect word with a bold font not without any reason. Side-effects are something that we are supposed to avoid in functional programming, but of course, programming real-life applications without side-effects would be impossible. The tap operator is a place where we can use side-effects without being criticized, whereas in other operators we should avoid them. Operators should be pure, they should not have any actions performed outside its scope. That is one of the concepts of being functional, but it is not only a theoretical rule. It also helps in writing clean code.

Filter

Next line:

  filter(x => x % 3 == 0),

The filter is as similar to the array’s filter as the map operator is. It just takes from the Observable only the values that match the criteria. In our case, it will take only numbers divisible by 3.

--1---4---9---16---25---36--|
  filter(x => x % 3 == 0),
----------9-------------36--|

SwitchMap

The time has come for the most complicated operator as for today.

  switchMap(x => range(x, 3))

As you can assume from its name (…Map), it works similarly to the map operator. It takes the value and transforms it, but unlike normal map, it transforms it into an Observable. So at the very moment, we have an Observable of Observables of numbers. It goes weird, but do not worry!

It has one more job as well - to flat the Observable of Observables to just one normal Observable. How does it do exactly? Read the docs!

Here is the diagram in our situation:

--9----------36-----------81---------|
  switchMap(x => range(x, 3))
--9-10-11----36-37-38-----81-82-83---|

The switchMap transformed one value into multiple ones using other Observables emitted inside. Magic! Very useful one though.

More operators!

Fortunately, there are plenty of them. It is worth to read about them in docs, so that you can easily choose some particular already-written operator instead of crafting your own code. They really cover most of the cases of combining Observables together, mapping, etc.

Summary

Observables and its operators are powerful tools for dealing with asynchronous programming. They are heavily used in e.g. Angular, where services should be written using them. No doubt, they help to create reactive code.

Advantages

  • Abstraction - the concept of an Observable is very abstract, which allows us to use it in many places, it is easy to find use-cases where it just fits.
  • Reusability - because of its laziness and plenty of operators, which are immutable, the code we write with Observables becomes highly reusable. With operators and their composition, we build our programs like houses with bricks.
  • Code is resilient for changes - we can use Observables like safe interfaces in our app. They can easily be changed to something else which just returns the same type of Observable, just behaves differently, and everything can still work in the referential places.
  • Code is cleaner - functional programming promotes less indirect dependencies so that our code is cleaner and easily debuggable.
  • Code is elegant - thanks to pipeable operators, we can read the Observables code from top to down, the logic is straightforward. In many cases, it is worth to use Observables instead of some native code, because it will just look better and familiar to other developers.
  • No need for async/await - it is possible to write an entire application with Observables without touching async/await keywords or Promises, which can sometimes be painful.
  • Ready solutions - there are many already implemented operators for most common scenarios we face when programming. It is better to use already written and tested function, than to invent the same one more time.

Disadvantages

  • Large surface - there is a lot of operators to learn, sometimes hard to find an appropriate
  • High learning curve - understanding deeply all the plenty of operators is not easy, especially at the beginning. What is the difference between flatMap, concatMap, switchMap? Where to use forkJoin instead of combineLatest? Many details are not easy to explain for beginners. Apart from that, it is fairly easy to forget about unsubscribing, which can lead to leaks and performance issues.
  • Temptation…* to use it everywhere, even if it is not necessary at all. A rule of thumb could be that Observables should be used for event processing, but not for data transformations.
  • Poor documentation - in my opinion, the documentation is pretty poor, there are many useful operators which do not have much explanation, often error handling is not touched in any way. Moreover, there can be some confusion between versions.

It is your decision if you want to use ReactiveX or not, but it is definitely worth to know it, no matter what language you write. The elegance and reusability provided by the Observables were enough to change my mind about programming front-end applications, and even programming at all. They are also an example that functional programming can be successfully applied in real-world programs. Be reactive and be functional!