Back to all posts

RxJS and Observables


RxJS (Reactive Extensions for JavaScript) to handle asynchronous operations. RxJS is a powerful library for working with asynchronous data streams using Observables.

What is RxJS?

RxJS stands for Reactive Extensions for JavaScript, and it enables developers to work with asynchronous data streams using a concept called Observables. Observables represent a stream of data that can be observed and manipulated in various ways.

They can emit multiple values over time (like a sequence of values), and you can subscribe to them to react to changes.

Key Concepts in RxJS

Before diving into custom Observables, it’s important to understand some core concepts of RxJS:

  1. Observables: These are data sources that emit values over time. They can represent anything from HTTP requests, user inputs, time-based events, etc.
  2. Observers: These are entities that “watch” the Observable and react to emitted values.
  3. Subscriptions: A subscription is an execution of an Observable, where the observer listens to the Observable’s emitted values.
  4. Operators: These are used to transform, filter, or combine Observables (e.g., map, filter, mergeMap, etc.).
  5. Subjects: A special kind of Observable that allows values to be multicasted to many Observers.

Working with RxJS in Angular

Angular leverages RxJS extensively. When working with Angular, you often encounter RxJS in the form of Observables, especially when dealing with HTTP requests, user events, and state changes. Angular services, like HttpClient, return Observables that you can subscribe to.

Consider this basic example of using RxJS with Angular to make an HTTP request:

import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { catchError, map } from 'rxjs/operators';

@Injectable({
  providedIn: 'root',
})
export class DataService {
  constructor(private http: HttpClient) {}

  getData(): Observable<any> {
    return this.http.get('https://api.example.com/data').pipe(
      map(response => response),
      catchError(error => {
        console.error('Error occurred', error);
        throw error;
      })
    );
  }
}

Custom Observables in RxJS

While Angular provides built-in Observables like HTTP requests, there are times when you need to create your own custom Observable to manage complex streams of data. RxJS gives us the ability to create Observables using the Observable.create() method or, more commonly today, the new Observable() constructor.

Let’s explore how to create a custom Observable that emits values on a regular interval, similar to setInterval but using the power of RxJS: customInterval.

Creating customInterval Observable

A customInterval Observable is one that emits a value at regular intervals, much like JavaScript’s setInterval(), but it allows for more flexibility and better integration with other Observables.

Here’s how we can implement a customInterval Observable:

import { Observable } from 'rxjs';

function customInterval(period: number): Observable<number> {
  return new Observable(observer => {
    let count = 0;
    const intervalId = setInterval(() => {
      observer.next(count++);
    }, period);

    // Clean up the interval when the subscription is unsubscribed
    return () => {
      clearInterval(intervalId);
    };
  });
}

How it Works

  • Observable Constructor: We use the new Observable() constructor to create a custom Observable. This Observable will emit values at regular intervals.
  • setInterval: Inside the Observable, we use JavaScript’s native setInterval() to emit values periodically.
  • Observer: The observer.next(count++) is called at each interval to emit the current value of count.
  • Cleanup: When the subscription is unsubscribed (i.e., when the component or the service no longer needs the stream), we clear the interval with clearInterval() to prevent memory leaks.

Using customInterval in Angular

Once the customInterval function is created, you can use it in any Angular component by subscribing to the Observable.

import { Component, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-timer',
  template: `
    <div>
      <p>Interval Timer: {{ time }}</p>
    </div>
  `,
})
export class TimerComponent implements OnDestroy {
  time: number = 0;
  private intervalSubscription: Subscription;

  ngOnInit() {
    // Subscribe to customInterval observable
    this.intervalSubscription = customInterval(1000).subscribe((count) => {
      this.time = count;
    });
  }

  ngOnDestroy() {
    // Unsubscribe to avoid memory leaks
    if (this.intervalSubscription) {
      this.intervalSubscription.unsubscribe();
    }
  }
}