Learning observables by cutting the fat away: The 10 most used observables operators

Share on facebook
Share on google
Share on twitter
Share on linkedin

Learning observables are way harder than learning to use promises with all the different operators and methods. But learning Observables is actually a very good investment because of the increasing use of reactive programming, like in Redux. This enables easier management of the state, caching and treating async and sync logic in the same manner.

How observables are an enhanced observer pattern

Some might recognize that Observables are very similar to the GoF pattern; Observer pattern, but has taken some improvements into consideration:

In the Observer pattern, there are two types of objects: subjects and observers. Observers are registered to subjects and every time the subject changes, it can notify observers by calling the registered observers notify callback method.

The problem here is that you often want to read and write segregation (CQRS) in your code, so you can distinguish classes that can publish and others that subscribe to subject notifications. You might not want everyone in your code to be able to notify observers. For that reason, observables split the Observer pattern Subject in a subject (publisher) and an observable (can subscribe to changes by registering observers/callbacks).

Subjects should not be passed around in the code. Instead, a subject can create an observable, which can be passed around, as it can’t publish data and can only listen for emits from the subject.

RxJs

RxJs is the implementation of the Observable pattern that is used in Angular. Angular uses observables for handling HTTP requests or @Output’s EventEmitter.

Why use observables?

Observables should be used for decoupling the write and read and enabling one-to-many communication in an app. Observable helps to build scalable applications for these reasons and is also used in Redux, a common standard for organizing observables in a Store pattern.

Hot and Cold observables

There are two types of observables: hot and cold. Hot observables will publish data regardless of if any observer has subscribed, just like a tv station. Cold observables will only publish when at least one observer is registered.
In Angular, the HttpClient is only performing an HTTP request when the subscribe method is called (registering observer) because it is a cold observable. In an Angular application, hot observables can be used for preloading data once the application starts and only publish it when subscribe is called with eg. the ShareReplay operator.

More on hot and cold observables here.

Variants of Subjects

In RxJs there are a couple of variants of Subjects:

  • Subject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Subject

The subject is the standard subject in RxJs. It is not storing any state of previous emits and will notify observers that have subscribed before the subject has published data.

BehaviorSubject

BehaviorSubject will remember the last emitted value and will use it to notify observers that er registered after the value was published.

ReplaySubject

ReplaySubject is a lot like BehaviorSubject but this will remember a user-specified amount of emitted values. When observers register the ReplaySubject will “replay” previous values to the observers.

AsyncSubject

AsyncSubject is a lot like behavior Subject as well, as this will remember the last emitted value but only publish it when the Observable sequence is completed.

The 10 most used Observable operators

Disclaimer, this is from my own experience, you might have other use cases that require other functions. But from working with a lot of different Angular apps in a lot of different domains, this is my opinion of the most used Observable operators:

  1. Subscribe
  2. Pipe
  3. Map
  4. Filter
  5. SwitchMap
  6. MergeMap
  7. Merge
  8. ForkJoin
  9. CombineLatest
  10. DistinctUntilChanged

Because observables are arrays/steams of data, a lot of the Observable operators work the same way as es6 array operators, such as map and filter.

Subscribe

Subscribe will register an observer callback, which will be called when the subject publishes data. For cold observables, the pipe of observable operators will not run until subscribe has been called, eg. HttpClient in Angular.

Example:

var subscription = source.subscribe(
  function (x) {
    console.log('Next: %s', x);
  },
  function (err) {
    console.log('Error: %s', err);
  },
  function () {
    console.log('Completed');
  });

Pipe

Pipe got introduced in RxJs 5.5 to allow piping of operators instead of chaining operators with dots. As of RxJs 6, Pipe is the container of RxJs operators and chaining operators are no longer possible. It pipes operators and executes them sequentially.

Example:

source$.pipe(
  filter(x => x % 2 === 0),
  map(x => x + x),
  scan((acc, x) => acc + x, 0)
)
.subscribe(x => console.log(x))

Map

Map takes the published value and returns an Observable.

Example:

source.pipe(map(({ name }) => name));

Map docs.

Filter

Filter removes a value from an Observable stream, specified by a boolean expression.

Example:

const example = source.pipe(filter(num => num > 5));
/*
  "Number greater than 5: 6"
  "Number greater than 5: 7"
  "Number greater than 5: 8"
  "Number greater than 5: 9"
*/
const subscribe = example.subscribe(val =>
  console.log(`Number greater than 5: ${val}`)
);

Filter docs

SwitchMap

SwitchMap is a lot like map, but it unsubscribes/switches from the previous observables and returns a new Observable.

Example:

//emit every click
const source = fromEvent(document, 'click');
//if another click comes within 3s, message will not be emitted
const example = source.pipe(
  switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
);
//(click)...3s...'Hello I made it!'...(click)...2s(click)...
const subscribe = example.subscribe(val => console.log(val))

SwitchMap docs.

MergeMap

MergeMap has the same input and output as SwitchMap but this operator will, unlike SwitchMap, not unsubscribe previous observables, but merge these into the new Observable. MergeMap will merge two observables into one.

const source = of('Hello');
//map to inner observable and flatten
const example = source.pipe(mergeMap(val => of(`${val} World!`)));
//output: 'Hello World!'
const subscribe = example.subscribe(val => console.log(val));

MergeMap docs.

ForkJoin

ForkJoin will wait for an array of observables to complete and then publish the value with combined result. This is the RxJs equivalent to Promise.all(), and you would often use this operator when firing HTTP requests in parallel and wait for all to complete. Only use this for observables that complete. If they don’t complete, you might want to use CombineLatest instead.

Example:

const myPromise = val =>
  new Promise(resolve =>
    setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
  );

const source = of([1, 2, 3, 4, 5]);
//emit array of all 5 results
const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));

ForkJoin docs.

CombineLatest

CombineLatest will subscribe to multiple streams and after every stream, as at least published once, it will emit when any of the streams publishes.

Example:

//timerOne emits first value at 1s, then once every 4s
const timerOne = timer(1000, 4000);
//timerTwo emits first value at 2s, then once every 4s
const timerTwo = timer(2000, 4000);
//timerThree emits first value at 3s, then once every 4s
const timerThree = timer(3000, 4000);

//combineLatest also takes an optional projection function
const combinedProject = combineLatest(
  timerOne,
  timerTwo,
  timerThree,
  (one, two, three) => {
    return `Timer One (Proj) Latest: ${one}, 
              Timer Two (Proj) Latest: ${two}, 
              Timer Three (Proj) Latest: ${three}`;
  }
);
//log values
const subscribe = combinedProject.subscribe(latestValuesProject =>
  console.log(latestValuesProject)
);

CombineLatest docs.

WithLatestFrom

This will take the latest published from a stream and unlike CombineLatest, will subscribe to future publishes to the stream.

Example:

const example = source.pipe(
  withLatestFrom(secondSource),
  map(([first, second]) => {
    return `First Source (5s): ${first} Second Source (1s): ${second}`;
  })
);

const subscribe = example.subscribe(val => console.log(val));

WithLatestFrom docs.

DistinctUntilChanged

Will only run Observable on new input. It is possible to provide a custom comparer expression for when inputs are distinct. This is helpful for optimizing not calling an observable multiple times if the input is the same.

Example:

const nonDistinctObjects = myArrayWithDuplicateObjects
  .pipe(distinctUntilChanged())
  //output: 'DISTINCT OBJECTS: {name: 'Test'}
  .subscribe(val => console.log('DISTINCT OBJECTS:', val));

DistinctUntilChanged docs.

Wrap up

We went through the basic workings of observables, hot and cold observables, the different RxJs Subjects and the ten most common RxJs operators in my experience. This should give you a foundation for getting 80 % of the results by knowing only the critical 20 %. To really get familiar with observables you should start using these operators in your applications and might even consider to use it for building reactive applications.

Where to go next

For more info on RxJs, I recommend reading the official documentation.

Do you want to become an Angular architect? Check out Angular Architect Accelerator.

Related Posts and Comments

How to Set up a CI pipeline with Azure Pipelines and Nx

It goes without saying that having a CI pipeline for your Angular apps is a must. Setting one up for regular Angular apps is fairly straightforward but when you have an Nx monorepo there are certain other challenges that you have to overcome to successfully orchestrate a “build once, deploy many” pipeline. This post will

Read More »

How to Set Up Git Hooks in an Nx Repo

Git hooks can be used to automate tasks in your development workflow. The earlier a bug is discovered, the cheaper it is to fix (and the less impact it has). Therefore it can be helpful to run tasks such as linting, formatting, and tests when you are e.g. committing and pushing your code, so any

Read More »

The Stages of an Angular Architecture with Nx

Long gone are the times when the frontend was just a dumb static website. Frontend apps have gotten increasingly complex since the rise of single-page application frameworks like Angular. It comes with the price of increased complexity and the ever-changing frontend landscape requires you to have an architecture that allows you to scale and adapt

Read More »

The Best Way to Use Signals in Angular Apps

Since Angular 16, Angular now has experimental support for signals and there is a lot of confusion in the community about whether this is going to replace RxJS or how it should be used in an app in combination with RxJS. This blog post sheds some light on what I think is the best way

Read More »

High ROI Testing with Cypress Component Testing

Testing is one of the most struggled topics in Angular development and many developers are either giving up testing altogether or applying inefficient testing practices consuming all their precious time while giving few results in return. This blog post will change all this as we will cover how I overcame these struggles the hard way

Read More »