Data Sources and Observables
Get to know the essential components of reactive programming, including streams and observables.
We'll cover the following...
Data sources
A data source can be defined as an actor that produces data over time, or simply stores data. This actor can be any of the following:
- An external device like a mouse or keyboard, which can (by a human) trigger events over time
- An HTTP call that is expected to return a value
- A component that holds an array of data elements
Even though they are not the same, the reactive paradigm treats any of these data sources in the same way, through the concept of stream, which can be defined as a data pipeline.
Producer-consumer approach
The entire paradigm is based on the producer-consumer approach, where a producer can send data to the data pipeline, i.e., stream, where the data can be transformed until it reaches one or more consumers.
For a consumer to be able to receive data from a producer, it must notify the producer that it’s interested in the data produced by the producer, i.e., subscribe to it. The following image illustrates this concept:
Producer-Consumer approach
A producer in RxJS is an Observable. It is a component that simply produces data over time, regardless of that being data generated from mouse or keyboard events, or stored in an array.
A consumer in RxJS will subscribe to the Observable in order to listen to all the data emitted by that Observable, but the data is not always in its original form as it can be transformed while flowing through the Data Pipeline.
The data pipeline is the place where all the fun stuff happens.
Data transformation
As the data flows through the data pipeline, several transformations can be applied to it before it reaches the consumers. Those transformations include filtering elements, mapping elements to different forms, ignoring specific elements, or even delaying the delivery of the elements.
Imagine what we can do with all that flexibility!
The following diagram illustrates the data transformation process with some example functions that can be applied to the data elements.
Sample data pipeline
Creating Observables
Before we dive deeper into the lessons, let’s see the inner working of an Observable. The following code snippet wraps an array into an observable and produces values synchronously by iterating over the array.
const {Observable} = require('rxjs')const wrapArrayIntoObservable = arr => {return new Observable(subscriber => {for(let item of arr) {subscriber.next(item);}});}const data = [1, 2, 3, 4, 5];const observable = wrapArrayIntoObservable(data);observable.subscribe(val => console.log('Subscriber 1: ' + val));observable.subscribe(val => console.log('Subscriber 2: ' + val));
Note that the function that is passed to the Observable constructor; this function is run for each subscriber that subscribes to this observable.
Furthermore, RxJS offers simpler methods for Observable creation, like of, from, interval, etc. In the following section, we are using from to create a synchronous data source.
Creating a data pipeline in RxJS
Let’s get our hands dirty by creating data pipelines and implementing multiple subscribers that consume that data.
const { from } = require('rxjs');const { tap, filter, map } = require('rxjs/operators');const arrayDataObservable$ = from([1, 2, 3, 4, 5]);const dataPipeline = arrayDataObservable$.pipe(tap(val => console.log('Value passing through the stream: ' + val)),filter(val => val > 2),map(val => val * 2))const subscribeToBaseObservable = subscriberName => {return arrayDataObservable$.subscribe(val => {console.log(subscriberName + ' received: ' + val);})}const subscribeToDataPipeline = subscriberName => {return dataPipeline.subscribe(val => {console.log(subscriberName + ' received: ' + val);})}const handleSubscriptionToBaseObservable = () => {const subscription1 = subscribeToBaseObservable('Subscriber1');const subscription2 = subscribeToBaseObservable('Subscriber2');}const handleSubscriptionToDataPipeline = () => {const subscription1 = subscribeToDataPipeline('Subscriber1');const subscription2 = subscribeToDataPipeline('Subscriber2');}// 1. Execute this function first// handleSubscriptionToBaseObservable();// 2. Execute this function nexthandleSubscriptionToDataPipeline();
The example above indicates two use cases:
- Subscribing to a raw observable, where no data transformation happens
- Subscribing to a data pipeline that actually transforms data into something the subscriber is interested in
Try executing one function at a time, and you would notice the difference. Every interested subscriber in the raw observable can subscribe to it and receive all the data that it produces over time. On the other hand, other subscribers might be interested only in values larger than two from the original data. In the meantime, the end-user would be notified (via tap) that some data transforming operations are taking place.
Notice that the following output is produced when both subscribers are subscribed to the data pipeline.
Value passing through the stream: 1
Value passing through the stream: 2
Value passing through the stream: 3
Subscriber1 received: 6
Value passing through the stream: 4
Subscriber1 received: 8
Value passing through the stream: 5
Subscriber1 received: 10
Value passing through the stream: 1
Value passing through the stream: 2
Value passing through the stream: 3
Subscriber2 received: 6
Value passing through the stream: 4
Subscriber2 received: 8
Value passing through the stream: 5
Subscriber2 received: 10
- The observable starts producing values only when an interested party subscribes to it.
- Each new subscriber gets the entire data emitted from the Observable.
- The data moves through each step of the pipeline until it reaches the subscriber.
- Subscriber 2 retrieves the values only after Subscriber 1 has retrieved and processed values from the pipeline.
The last point might sound strange, but it happens since the original data source is synchronous.
We explain the difference between synchronous and asynchronous data sources in detail in the next lesson.
Observables basics
How are Observables evaluated?
Eagerly
Lazily