In RxJS, Observables
are collections of data (similar to arrays) that can produce zero or many values over time, either synchronously or asynchronously. Observables
do not start emitting values until they are subscribed to.
Observable
Every Observable
instance passes through four stages throughout its lifetime:
The four aspects above are all encoded in an
Observable
instance.
We can use new Observable
or a creation operator to create Observables
. The Observable
constructor takes one argument, the subscribe
function.
Let’s look at how to create an Observable
!
const {Observable} = require('rxjs')const wrapArrayIntoObservable = arr => {return new Observable(subscriber => {for(let item of arr) {subscriber.next(item);}});}const data = [1, 2, 3, 4, 5];const observable = wrapArrayIntoObservable(data);observable.subscribe(val => console.log('Subscriber 1: ' + val));observable.subscribe(val => console.log('Subscriber 2: ' + val));
In line #3, we create the wrapArrayIntoObservable()
function, which takes an array as a parameter and wraps that array into an observable
. This function is then passed to the Observable
constructor in line #12 and runs for each subscriber. Finally, in lines 14 and 15, each subscriber
prints the received data stream.
Subscribing an Observable
instance is a way to start an “Observable execution” and deliver values or events to an observer of that execution. Subscribing is totally different from event handler APIs like addEventListener
or removeEventListener
.
In the code example above, the subscriber
function is implemented with the creation of Observable
. In lines #14 and 15, each subscriber
prints the received data stream.
observable.subscribe(val => console.log('Subscriber 1: ' + val));
observable.subscribe(val => console.log('Subscriber 2: ' + val));
With
observable.subscribe()
, the given observer is not registered as a listener in theObservable
. TheObservable
does not maintain a list of attached Observers.
Whenever an Observer is subscribed with the observable
instance, the subscribe
function is executed.
Observable
execution can deliver three types of values:
next()
method, Observable
can push a value to an Observer. That value can be a number, a string, or an object.Observable
can send a JavaScript Error or exception with the error()
method. Once an error occurs, nothing will be sent to the Observer.observable
has been completed successfully, and the subscriptions for that particular observer are completed. Nothing else can be delivered to the observer after the complete()
method is called.In an
Observable
execution,next()
can deliver zero to infinite notifications. If one of the other two notifications is delivered, thennext()
will stop its execution, too.
In the following example of Observable
execution, five “next” notifications are delivered, followed by a “complete” notification.
import { Observable } from 'rxjs';
const obsrInstance = new Observable(function subscribe(subscriber) {
subscriber.next(10);
subscriber.next(20);
subscriber.next(30);
subscriber.next(40);
subscriber.next(50);
subscriber.complete();
});
In the code below, the “next” notification with the value 60
will not be delivered because the observable
is already completed.
import { Observable } from 'rxjs';
const obsrInstance = new Observable(function subscribe(subscriber) {
subscriber.next(10);
subscriber.next(20);
subscriber.next(30);
subscriber.next(40);
subscriber.next(50);
subscriber.complete();
subscriber.next(60); // this will not deliver because Observable has already completed
});
It is always good to wrap your code with a try/catch
block that can throw an error or exception.
import { Observable } from 'rxjs';
const obsrInstance = new Observable(function subscribe(subscriber) {
try {
subscriber.next(10);
subscriber.next(20);
subscriber.next(30);
subscriber.next(40);
subscriber.next(50);
subscriber.complete();
} catch (err) {
subscriber.error(err); // delivers an error if it caught one
}
});
As discussed earlier, Observable
can deliver infinite “next” notifications. To make it finite, we have to make sure that the Observable
is destroyed (unsubscribe).
Eliminating an observable
by withdrawing it from the DOM
will destruct it. RxJS automatically unsubscribes (for asynchronous logic only) Observables
immediately after an error or a complete notification delivers.
We can also manually trigger the unsubscribe
function. The code is given below:
return function unsubscribe() {
clearInterval(obsrInstance);
};