What is the RxJs Retry Operator and how does it work?

widget

The retry operator works by subscribing to the caught observable in case of an error until the attempts are exhausted.

Let’s break down the above and translate it to human language.

For that we’ll do the following:

  • start with an Observable refresher
  • look at a basic use case of the retry operator
  • recreate the logic of the retry operator
  • see what happens when the attempts run out

Observable

For a quick refresher on Observable - it has a subscribe method that accepts 3 callbacks:

  • next - called in case of emission
  • error - called in case of an error
  • **`complete`` - called in case the observable chain has completed# Observable

This next code snippet demonstrates an Observable that emits once and then completes:

foo
main.js
const { of } = require('rxjs');
const f = of('emission');
f.subscribe(
x => console.log(x),
(e, observable) => { /** error case */},
() => { console.log('complete')}
)
foo
main.js
const { of } = require('rxjs');
const f = of('emission');
const subscriber = {
next: x => console.log(x),
error: (e, observable) => { /** error case */},
complete: () => { console.log('complete')}
}
f.subscribe(subscriber);

The above demonstrates how a subscriber object can be used in the subscribe method and it will work the same.

retry basics

The retry operator will retry the observable in case of an error.

Take a look at the code below for an example:

foo
main.js
const { of, throwError } = require('rxjs');
const { retry, switchMap } = require('rxjs/operators');
let calls = 0;
const throwsErrorFirstEmission = of('emission').pipe(
switchMap(e => {
if(calls === 0) {
calls += 1;
console.log('throwing error!')
return throwError('e');
}
return of(e);
})
);
throwsErrorFirstEmission.pipe(
retry(1)
)
.subscribe(
x => console.log(x),
(e, observable) => { console.log(e) },
() => { console.log('complete')}
)

So, what happens here?

We are declaring in our pipe method that we’d like to retry this observable chain 1 time, making the total maximum attempts 2. That’s 1 initially and 1 retry.

The retry operator will subscribe to the observable (at subscribe-time - line 20) and will listen for the error case. In case of error, it will re-attempt the observable unless the number of attempts is equal to the provided value.

Break it down

If we were to unwrap the retry operator in code, it would look like:

foo
main.js
const { of, throwError } = require('rxjs');
const { switchMap } = require('rxjs/operators');
let calls = 0;
const throwsErrorFirstEmission = of('emission').pipe(
switchMap(e => {
if (calls === 0) {
calls += 1;
console.log('throwing error!')
return throwError('e');
}
return of(e);
})
);
// want to reuse the subscriber but decrease the attempts
function subscriber(upTo) {
return {
next: x => console.log(x),
error: retryUpTo(upTo),
complete: () => { console.log('complete') }
}
};
const retryUpTo = (upTo) => {
return (error) => {
// if there are attempts left - retry!
if (upTo > 0) {
throwsErrorFirstEmission.subscribe(subscriber(upTo - 1));
} else {
throw console.error(e);
}
}
}
throwsErrorFirstEmission
.subscribe(subscriber(1))

Sorry about the convoluted code, let’s try to break it down:

The highlighted function will get called if the observable chain fails. It will then proceed to do the following:

  • while there are attempts left
  • re-subscribe to the observable
  • decrease the attempts
  • recursively call itself in the case of error
  • pass on the error if no more attempts

That’s what the retry operator does!

What happens if the attempts are over but the observable still throws?

Then the retry will pass on the error.

For simplicity, we just use console.error here:

foo
main.js
const { of, throwError } = require('rxjs');
const { switchMap } = require('rxjs/operators');
let calls = 0;
const throwsErrorFirstEmission = of('emission').pipe(
switchMap(e => {
if (calls === 0 || calls === 1) {
calls += 1;
console.log('throwing error!')
return throwError('e');
}
return of(e);
})
);
// want to reuse the subscriber but decrease the attempts
function subscriber(upTo) {
return {
next: x => console.log(x),
error: retryUpTo(upTo),
complete: () => { console.log('complete') }
}
};
const retryUpTo = (upTo) => {
return (error) => {
// if there are attempts left - retry!
if (upTo > 0) {
throwsErrorFirstEmission.subscribe(subscriber(upTo - 1));
} else {
console.error(error);
}
}
}
throwsErrorFirstEmission
.subscribe(subscriber(1))

If we switch to using the actual retry operator, the behavior is the same:

foo
main.js
const { of, throwError } = require('rxjs');
const { retry, switchMap } = require('rxjs/operators');
let calls = 0;
let allowed = 2;
const throwsErrorTwice = of('emission').pipe(
switchMap(e => {
if(calls < allowed) {
calls += 1;
console.log('throwing error!')
return throwError('e');
}
return of(e);
})
);
throwsErrorTwice
.pipe(retry(1))
.subscribe(
x => console.log(x),
(e) => console.error(e),
() => { console.log('complete')}
)

Summary

We implemented logic to do the following:

  • subscribe to an observable that throws
  • re-subscribe to it on error
  • log the value of the emission

We then showed that switching to the actual retry operator retains the behavior of that logic.

The retry operator works by subscribing to the caught observable, in case of an error, until the attempts are exhausted.

Hopefully, that makes more sense now.

If not go back and read it again :)

Thanks for reading!

Attributions:
  1. undefined by undefined
  2. undefined by undefined
Copyright ©2024 Educative, Inc. All rights reserved