w3resource

The RxJS library

Reactive programming is an asynchronous programming paradigm that is concerned with the data stream and the propagation of change.

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables that makes it easier to compose asynchronous or callback-based code (RxJS Docs).

RxJS provides an implementation of the Observable type, which is needed until the type becomes part of the language and until browsers support it. The library also provides utility functions for creating and working with observables. These utility functions can be used for:

  • Converting existing code for async operations into observables
  • Iterating through the values in a stream
  • Mapping values to different types
  • Filtering streams
  • Composing multiple streams

Observable creation functions

RxJS offers a number of functions that can be used to create new observables. These functions can simplify the process of creating observables from things such as events, timers, promises, and so on. For example:

Creating an observable from a promise

TypeScript Code:

import { from } from 'rxjs';

// Create an Observable out of a promise
const data = from(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
 next(response) { console.log(response); },
 error(err) { console.error('Error: ' + err); },
 complete() { console.log('Completed'); }
});

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen observable_from_promise by w3resource (@w3resource) on CodePen.


Create an observable from a counter

TypeScript Code:

import { interval } from 'rxjs';

// Create an Observable that will publish a value on an interval
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
secondsCounter.subscribe(n =>
  console.log(`It's been ${n} seconds since subscribing!`));

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen observable_from_counter by w3resource (@w3resource) on CodePen.


Creating an observable from an event

TypeScript Code:

import { fromEvent } from 'rxjs';

const el = document.getElementById('my-element');

// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');

// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
  // Log coords of mouse movements
  console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);

  // When the mouse is over the upper-left of the screen,
  // unsubscribe to stop listening for mouse movements
  if (evt.clientX < 40 && evt.clientY < 40) {
    subscription.unsubscribe();
  }
});

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen observable_from_event by w3resource (@w3resource) on CodePen.


Observable from an Ajax Request

TypeScript Code:

import { ajax } from 'rxjs/ajax';

// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen observable_from_AjaxRequest by w3resource (@w3resource) on CodePen.


Operators

Operators are functions that build on the observables foundation to enable sophisticated manipulation of collections. For example, RxJS defines operators such as map(), filter(), concat(), and flatMap().

Operators take configuration options, and they return a function that takes a source observable. When executing this returned function, the operator observes the source observable’s emitted values, transforms them, and returns a new observable of those transformed values. Here is a simple example:

Map operator

TypeScript Code:

import { map } from 'rxjs/operators';

const nums = of(1, 2, 3);

const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);

squaredNums.subscribe(x => console.log(x));

// Logs
// 1
// 4
// 9

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen map_Operator by w3resource (@w3resource) on CodePen.


You can use pipes to link operators together. Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.

A set of operators applied to an observable is a recipe—that is, a set of instructions for producing the values you’re interested in. By itself, the recipe doesn’t do anything. You need to call subscribe() to produce a result through the recipe.

As an example, consider the pipe function below:

TypeScript Code:

import { filter, map } from 'rxjs/operators';

const nums = of(1, 2, 3, 4, 5);

// Create a function that accepts an Observable.
const squareOddVals = pipe(
  filter((n: number) => n % 2 !== 0),
  map(n => n * n)
);

// Create an Observable that will run the filter and map functions
const squareOdd = squareOddVals(nums);

// Subscribe to run the combined functions
squareOdd.subscribe(x => console.log(x));

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen Observable_pipefunctions by w3resource (@w3resource) on CodePen.


This code above can be made to be shorter because the pipe function is also a method on the RxJS Observable, this is shown below:

TypeScript Code:

import { filter, map } from 'rxjs/operators';

const squareOdd = of(1, 2, 3, 4, 5)
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen mapped_pipefunction by w3resource (@w3resource) on CodePen.


Error handling

In addition to the error() handler that you provide on subscription, RxJS provides the catchError operator that lets you handle known errors in the observable recipe.

For instance, suppose you have an observable that makes an API request and maps to the response from the server. If the server returns an error or the value doesn’t exist, an error is produced. If you catch this error and supply a default value, your stream continues to process values rather than erroring out.

An example of using the catchError operator to do this is as shown below:

TypeScript Code:

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen catchError_Operator by w3resource (@w3resource) on CodePen.


Retry failed observable

Where the catchError operator provides a simple path of recovery, the retry operator lets you retry a failed request.

Use the retry operator before the catchError operator. It resubscribes to the original source observable, which can then re-run the full sequence of actions that resulted in the error. If this includes an HTTP request, it will retry that HTTP request.

The following converts the previous example to retry the request before catching the error:

TypeScript Code:

import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

Live Demo:

It is just a code snippet explaining a particular concept and may not have any output

See the Pen retry_operator by w3resource (@w3resource) on CodePen.


Previous: Observables
Next: Observables in Angular



Follow us on Facebook and Twitter for latest update.