How to create and handle Observables in RxJS

In RxJS, Observables are collections of data (similar to arrays) that can produce zero or many values over time, either synchronously or asynchronously. Observables do not start emitting values until they are subscribed to.

The lifecycle of an Observable

Every Observable instance passes through four stages throughout its lifetime:

  • Creation
  • Subscription
  • Execution
  • Destruction

The four aspects above are all encoded in an Observable instance.

Creation

We can use new Observable or a creation operator to create Observables. The Observable constructor takes one argument, the subscribe function.

Let’s look at how to create an Observable!

foo
main.js
const {Observable} = require('rxjs')
const wrapArrayIntoObservable = arr => {
return new Observable(subscriber => {
for(let item of arr) {
subscriber.next(item);
}
});
}
const data = [1, 2, 3, 4, 5];
const observable = wrapArrayIntoObservable(data);
observable.subscribe(val => console.log('Subscriber 1: ' + val));
observable.subscribe(val => console.log('Subscriber 2: ' + val));

Code description

In line #3, we create the wrapArrayIntoObservable() function, which takes an array as a parameter and wraps that array into an observable. This function is then passed to the Observable constructor in line #12 and runs for each subscriber. Finally, in lines 14 and 15, each subscriber prints the received data stream.

Subscription

Subscribing an Observable instance is a way to start an “Observable execution” and deliver values or events to an observer of that execution. Subscribing is totally different from event handler APIs like addEventListener or removeEventListener.

In the code example above, the subscriber function is implemented with the creation of Observable. In lines #14 and 15, each subscriber prints the received data stream.

observable.subscribe(val => console.log('Subscriber 1: ' + val));
observable.subscribe(val => console.log('Subscriber 2: ' + val));

With observable.subscribe(), the given observer is not registered as a listener in the Observable. The Observable does not maintain a list of attached Observers.

Observables produce multiple values called "streams" and push them to observers, which serve as consumers.

Execution

Whenever an Observer is subscribed with the observable instance, the subscribe function is executed.

Observable execution can deliver three types of values:

  • Next value: With the next() method, Observable can push a value to an Observer. That value can be a number, a string, or an object.
  • Error value: An Observable can send a JavaScript Error or exception with the error() method. Once an error occurs, nothing will be sent to the Observer.
  • Complete value: This is a signal that the observable has been completed successfully, and the subscriptions for that particular observer are completed. Nothing else can be delivered to the observer after the complete() method is called.

In an Observable execution, next() can deliver zero to infinite notifications. If one of the other two notifications is delivered, then next() will stop its execution, too.

In the following example of Observable execution, five “next” notifications are delivered, followed by a “complete” notification.

import { Observable } from 'rxjs';

const obsrInstance = new Observable(function subscribe(subscriber) {
 subscriber.next(10);
 subscriber.next(20);
 subscriber.next(30);
 subscriber.next(40);
 subscriber.next(50);
 
 subscriber.complete();
});

In the code below, the “next” notification with the value 60 will not be delivered because the observable is already completed.

import { Observable } from 'rxjs';

const obsrInstance = new Observable(function subscribe(subscriber) {
 subscriber.next(10);
 subscriber.next(20);
 subscriber.next(30);
 subscriber.next(40);
 subscriber.next(50);
 
 subscriber.complete();

 subscriber.next(60); // this will not deliver because Observable has already completed
});

It is always good to wrap your code with a try/catch block that can throw an error or exception.

import { Observable } from 'rxjs';
 
const obsrInstance = new Observable(function subscribe(subscriber) {
  try {
 subscriber.next(10);
 subscriber.next(20);
 subscriber.next(30);
 subscriber.next(40);
 subscriber.next(50);
 
 subscriber.complete();
  } catch (err) {
    subscriber.error(err); // delivers an error if it caught one
  }
});

Destruction

As discussed earlier, Observable can deliver infinite “next” notifications. To make it finite, we have to make sure that the Observable is destroyed (unsubscribe).

Eliminating an observable by withdrawing it from the DOM will destruct it. RxJS automatically unsubscribes (for asynchronous logic only) Observables immediately after an error or a complete notification delivers.

We can also manually trigger the unsubscribe function. The code is given below:

return function unsubscribe() {
    clearInterval(obsrInstance);
  };

Free Resources

Copyright ©2024 Educative, Inc. All rights reserved