learn more on RxJS

Rx JS Top Operators with examples

import { interval, of, concat, from, merge, timer } from "rxjs";
import {
  take, map, tap, startWith, share, mapTo, timeout, delay, concatMap, 
  mergeMap, switchMap, exhaustMap
} from "rxjs/operators";

Let’s start operators with definition and examples:


from operator

Turn an array, promise, or iterable into an observable.
💡 This operator can be used to convert a promise to an observable!
💡 For arrays and iterables, all contained values will be emitted as a sequence!
💡 This operator can also be used to emit a string as a sequence of characters!

const arr1$ = from([10, 8, 6]);

arr1$.subscribe(console.log);

*output*
 10 8 6
from([{ a: 1, b: 3 }, { a: 10, b: 30 }]).subscribe(console.log);

*output*
{ a: 1, b: 3 }    
{ a: 10, b: 30 }
from("JSMount").subscribe(console.log);

*output*
J S M O U N T (It will print in a sequence one by one )
from(Promise.resolve("Js-Mount")).subscribe(console.log);

*output*
JS-Mount   (It will print promise output once it is resolved)


of operator

It can accept any number of arguments and any type of values here object, string or function, etc.. and return all in a sequence.

const source3 = of({ name: "Brian" }, [1, 2, 3], function hello() {
  return "Hello";
});
source3.subscribe(val => console.log(val));

*output*
{ name: "Brian" }
[1, 2, 3]
ƒ hello() {
    return "Hello";
}


What is the difference between from and of operators?

from: It creates observable from an array, promise, or iterable and it takes only one argument. For arrays, iterables, and strings, all contained values will be emitted as a sequence.

of: Create observable with variable amounts of values, emit values in the sequence, but arrays as a single value.

const values = [1, 2, 3];
from(values); // 1 ... 2 ... 3

const values = [1, 2, 3];
of(values, 'hi', 4, 5); // [1, 2, 3] ... 'hi' ... 4 ... 5


interval operator

let myInterval = interval(4000);
myInterval.subscribe(console.log);

it will console 0 1 2 3 …. with an interval of 4 seconds same as setInterval of JavaScript.

By Using take / takeWhile / takeUntil operators of RxJS, we can stop this interval.

myInterval.pipe(take(3)).subscribe(value => {
  console.log(value);
});

*output*
0 1 2 (it will emit only 3 values)


map operator

map operator works the same as a JS Array map function. It applies projection function with each value from source & and return processed observable.

const source1 = of(
  { name: "A", id: 1 },
  { name: "B", id: 2 },
  { name: "C", id: 3 }
);

source1.pipe(map((value: any) => value.name)).subscribe(console.log);
*output*
 A B C 

source1.pipe(map((value: any) => value.id + '-' + value.name)).subscribe(console.log);
*output*
1-A 2-B 3-C


mapTo operator

mapTo operator takes one value as an argument and emits that value for each source

of(1,2,3).pipe(mapTo('Hello')).subscribe(console.log);
*output*
Hello Hello Hello (it will emit Hello for each source means we have mapped each source value with our constant value).


tap operator

tap operator basically used for logging. We can log values before the map and after the map operation. If we try to change the real source value inside the tap arrow function, it will not impact anything.

of({ name: "A", id: 1 })
  .pipe(
    tap(val => console.log("here is an object before map - " + val.name)),
    map((value: any) => value.id + " @ " + value.name),
    tap(val => console.log("After performing map on this - " + val))
  )
  .subscribe(console.log);

*output*

here is an object before map - A
After performing map on this - 1 @ A
1 @ A

tap also accepts an object map to log next, error, and complete

const example = source2
  .pipe(
    map(val => val + 10),
    tap({
      next: val => {
        // on next 11, etc.
        console.log("on next", val);
      },
      error: error => {
        console.log("on error", error.message);
      },
      complete: () => console.log("on complete")
    })
  )
  // output: 11, 12, 13, 14, 15
  .subscribe(val => console.log(val));


startWith operator

If you want to start observable source emit with the predefined value you can use startWith.

const source5 = from(["3", "7", "9"]);
source5.pipe(startWith("1")).subscribe(console.log);

*output*
1 3 7 9 

*Note: we can also provide multiple values as an arguments in startWith

of(11, 12)
  .pipe(startWith("1", "4", "10"))
  .subscribe(console.log);

*output*
11 12 1 4 10


concat operator

It accepts a list of Observables and subscribes to them one after another when the previous Observable completes.

Subscribe to observables in order as previous completes.
💡 You can think of concat like a line at an ATM, the next transaction (subscription) cannot start until the previous complete!💡 If throughput, not order, is a primary concern, try merge instead!

const source7 = of("First observable with delay of 2 sec").pipe(delay(2000));
const source8 = of("Second observable without any delay");

concat(source7, source8).subscribe(console.log);

*output*
First observable with delay of 2 sec
Second observable without any delay


merge operator

simply merge more than one observable into a single one, here order does not matter 

* Turn multiple observables into a single observable.💡 This operator can be used as either a static or instance method!💡 If order throughput is a primary concern, try concat instead!

const source7 = of("First observable with delay of 2 sec").pipe(delay(2000));
const source8 = of("Second observable without any delay");
merge(of("************"), source7, source8).subscribe(console.log);

*output* (it does not wait for first source to be complete, it just emit which source received)

Second observable without any delay
First observable with delay of 2 sec


concatMap operator

concatMap accepts as a parameter a function that is invoked for every item from its source and that returns an inner Observable. concatMap then calls its callback only when the previous inner Observables completes (here order of inner observable matters).

const source10 = of(
  { name: "concatMap-A-with-3-second-delay", time: 3000 },
  { name: "concatMap-B-with-1-second-delay", time: 1000 }
);

source10
  .pipe(concatMap(value => of(value.name).pipe(delay(value.time))))
  .subscribe(console.log);

*output*
concatMap-A-with-3-second-delay
concatMap-B-with-1-second-delay


What is the difference between concat and concatMap RxJS operators?

The main difference between concatMap and concat is that concatMap accepts as a parameter a function that is invoked for every item from its source and that returns an inner Observable (it maps each item from its source to an Observable). concatMap then calls its callback only when the previous inner Observables completes.

On the other hand, concat just accepts a list of Observables and subscribes to them one after another when the previous Observable completes.


mergeMap operator

mergeMap operators take a function as a parameter and return an inner observable. Its call back subscribes inner observables immediately means does not wait for the first one to be emitted.

const source11 = of(
  { name: "mergeMap-A-with-3-second-delay", time: 3000 },
  { name: "mergeMap-B-with-1-second-delay", time: 1000 }
);

source11
  .pipe(mergeMap(value => of(value.name).pipe(delay(value.time))))
  .subscribe(console.log);

*output*
mergeMap-B-with-1-second-delay
mergeMap-A-with-3-second-delay


What is the difference between concatMap and mergeMap?

concatMap does not subscribe to the next observable until the previous completes, the value from the source delayed by 3000ms will be emitted first.
Contrast this with mergeMap which subscribes immediately to inner observables, the observable with the lesser delay (1000ms) will emit, followed by the observable which takes 3000ms to complete.


switchMap operator

The main difference between switchMap and other flattening operators is the canceling effect. On each emission, the previous inner observable (the result of the function you supplied) is canceled and the new observable is subscribed. You can remember this by the phrase switch to a new observable.
switchMap maintains only one inner subscription at a time. (Great example of its google search functionality)

const source12 = of(
  { name: "switchMap-A-with-3-second-delay", time: 3000 },
  { name: "switchMap-B-with-1-second-delay", time: 1000 }
);

source12
  .pipe(switchMap(value => of(value.name).pipe(delay(value.time))))
  .subscribe(console.log);

*output* (it canceled first inner observable as it get second one)
switchMap-B-with-1-second-delay


What is the difference between combineLatest and forkjoin operators?

forkJoin – When all observables complete, emit the last emitted value from each. If an inner observable does not complete, forkJoin will never emit a value!

forkJoin uses – If you wish to issue multiple requests on page load (or some other event) and only want to take action when a response has been received for all. In this way, it is similar to how you might use Promise.all.
Be aware that if any of the inner observables supplied to forkJoin error you will lose the value of any other observables that would or have already completed if you do not catch the error on the inner observable.

Running Example:
https://stackblitz.com/edit/fork-join-and-combinelatest?file=index.ts

const a =  of('B').pipe(delay(2000));
const b =  of('A').pipe(delay(1000));

forkJoin({a, b}).subscribe(({a, b}) => {console.log(a, b)});

*output*
 B A

✔ combineLatest – When any observable emits a value, emit the latest value from each. combineLatest returns an Observable that produces a new value every time, once all input observables have produced at least one value.

Be aware that combineLatest will not emit an initial value until each observable emits at least one value.

const s1 = new Subject();
const s2 = new Subject();

combineLatest(s1, s2).subscribe(([s1, s2]) => {
  console.log(s1, s2);
});
s1.next("red");
s2.next("red 1");

s2.next("blue");

*output*
red
red 1

red
blue

Running Example:
https://stackblitz.com/edit/fork-join-and-combinelatest?file=index.ts


takeUntil operator

Observables emit values until provided observable emits.

interval(1000).pipe(takeUntil(timer(4000))).subscribe(console.log);

*output*
0 1 2  (so when inner observable executes, it stop emiting value)

takeUntil also used to stop an Observable execution multiple times.

const sub = new Subject<void>();  // create a Void type Subject and pass this in takeUntil operator

interval(1000)
  .pipe(takeUntil(sub))
  .subscribe(console.log);

// sub will be completed Once below timeout will run and it will stop above execution.
setTimeout(() => {
  sub.next();
  sub.complete();
}, 5000);

*output*
0 1 2 3 4 


*************************

Visit Rx JS official site to learn more on operators.
https://www.learnrxjs.io/learn-rxjs/operators

See stackblitz executables examples here:
https://stackblitz.com/edit/rx-js-operators?file=index.ts

Rx JS Top Operators with examples

Leave a Reply

Your email address will not be published.