Skip to main content
Advertisement

16.6 RxJS & Reactive Programming — Observable, Subject, Key Operators

What Is RxJS?

RxJS (Reactive Extensions for JavaScript) is a library for working with asynchronous data streams. It implements the reactive programming paradigm — "treating everything as a stream" — in JavaScript.

Angular uses RxJS in its core features: HttpClient, Router, Forms, and more.

Reactive Programming Concept

Traditional approach:
Request data → Wait → Response → Process

Reactive approach:
Subscribe to data stream → Automatically process data as it arrives
(Click events, HTTP responses, timers, user input — all unified as streams)

Observable vs Promise

AspectObservablePromise
DataMultiple values (stream)Single value
ExecutionLazy (starts on subscribe)Eager (starts immediately)
CancellationPossible (unsubscribe)Not possible
Operators100+ rich operatorsthen/catch/finally
Sync supportYesNo
Use caseEvents, HTTP, realtimeOne-time async
import { Observable, of, from } from 'rxjs';
import { map, filter } from 'rxjs/operators';

// Create an Observable directly
const observable$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

// Subscribe (starts execution)
const subscription = observable$.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completed!')
});

// Unsubscribe (prevent memory leaks)
subscription.unsubscribe();

Subject Types

A Subject is both an Observable and an Observer. It multicasts values to multiple subscribers.

Subject — Basic

import { Subject } from 'rxjs';

const subject$ = new Subject<string>();

// Two subscribers
subject$.subscribe(v => console.log('Subscriber 1:', v));
subject$.subscribe(v => console.log('Subscriber 2:', v));

subject$.next('Hello'); // Both subscribers receive it
subject$.next('World');
// Subscriber 1: Hello / Subscriber 2: Hello
// Subscriber 1: World / Subscriber 2: World

// Subscribers added after a value is emitted don't receive past values
subject$.subscribe(v => console.log('Subscriber 3:', v)); // Receives only future values

BehaviorSubject — Holds Current Value

import { BehaviorSubject } from 'rxjs';

// Initial value required
const count$ = new BehaviorSubject<number>(0);

// Even late subscribers immediately receive the current value (0)
count$.subscribe(v => console.log('Current count:', v)); // Immediately prints 0

count$.next(1); // → 1
count$.next(2); // → 2

// Read current value synchronously
console.log(count$.getValue()); // 2

// Commonly used for state management in services
@Injectable({ providedIn: 'root' })
export class CartService {
private cartItems$ = new BehaviorSubject<CartItem[]>([]);

// Expose as read-only Observable
readonly items$ = this.cartItems$.asObservable();

addItem(item: CartItem) {
const current = this.cartItems$.getValue();
this.cartItems$.next([...current, item]);
}
}

ReplaySubject — Replay Last N Values

import { ReplaySubject } from 'rxjs';

// Buffer the last 3 values
const replay$ = new ReplaySubject<number>(3);

replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);

// Late subscribers receive the last 3 values (2, 3, 4)
replay$.subscribe(v => console.log(v)); // 2, 3, 4

Observable Creation Operators

import { of, from, interval, timer, fromEvent, EMPTY, NEVER } from 'rxjs';

// of: Observable from synchronous values
of(1, 2, 3).subscribe(console.log); // 1, 2, 3

// from: Array, Promise, Iterable → Observable
from([1, 2, 3]).subscribe(console.log);
from(fetch('/api/users')).subscribe(console.log);
from(Promise.resolve('hello')).subscribe(console.log);

// interval: Emit values periodically (0, 1, 2, ...)
interval(1000).subscribe(n => console.log(`${n} seconds elapsed`));

// timer: Emit once after delay, or periodically after delay
timer(3000).subscribe(() => console.log('Runs after 3 seconds'));
timer(0, 1000).subscribe(n => console.log(`${n}s`)); // Starts immediately, every second

// fromEvent: Convert DOM events to Observable
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(event => console.log('Clicked!', event));

// EMPTY: Completes immediately
EMPTY.subscribe({ complete: () => console.log('Completed') });

// NEVER: Emits nothing and never completes

Key Transformation Operators

map — Transform

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3).pipe(
map(n => n * 2)
).subscribe(console.log); // 2, 4, 6

// Object transformation
this.http.get<ApiResponse>('/api/users').pipe(
map(response => response.data),
map(users => users.filter(u => u.active))
).subscribe(users => this.users.set(users));
import { switchMap, debounceTime } from 'rxjs/operators';

searchControl.valueChanges.pipe(
debounceTime(300),
switchMap(term =>
// Cancels previous request and switches to the new one
this.http.get<Product[]>(`/api/products?q=${term}`)
)
).subscribe(products => this.results.set(products));

mergeMap — Process Multiple Inner Observables Concurrently

import { mergeMap } from 'rxjs/operators';

// Upload multiple files simultaneously
from(selectedFiles).pipe(
mergeMap(file => this.uploadService.upload(file))
).subscribe(result => console.log('Upload complete:', result.name));

concatMap — Process in Order (wait for completion)

import { concatMap } from 'rxjs/operators';

// Guarantee request order (start next after previous completes)
from(orderedRequests).pipe(
concatMap(req => this.http.post('/api/process', req))
).subscribe(result => console.log('Processed:', result));

exhaustMap — Ignore New Requests While In Progress (prevent duplicate submit)

import { exhaustMap } from 'rxjs/operators';

// Button click — ignore additional clicks while processing
fromEvent(submitButton, 'click').pipe(
exhaustMap(() => this.http.post('/api/order', orderData))
).subscribe(result => console.log('Order complete:', result));

Comparison of flatMap Variants

OperatorBehaviorUse Case
switchMapCancel previous, keep latestReal-time search, autocomplete
mergeMapConcurrent execution, order irrelevantParallel file uploads
concatMapSequential, one at a timeOrder-sensitive operations
exhaustMapIgnore if in progressPrevent duplicate form submissions

Filtering Operators

import {
filter, take, takeUntil, takeWhile,
debounceTime, throttleTime, distinctUntilChanged,
first, last, skip
} from 'rxjs/operators';

// filter: Only pass values that meet the condition
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0)
).subscribe(console.log); // 2, 4

// take: Receive N values then complete
interval(1000).pipe(
take(5)
).subscribe(console.log); // 0, 1, 2, 3, 4 then complete

// takeUntil: Run until another Observable emits
const stop$ = new Subject<void>();
interval(1000).pipe(
takeUntil(stop$)
).subscribe(n => console.log(n));
setTimeout(() => stop$.next(), 5000); // Stop after 5 seconds

// debounceTime: Wait N ms after last input (great for search)
searchInput.pipe(
debounceTime(300),
distinctUntilChanged() // Ignore consecutive duplicate values
).subscribe(term => this.search(term));

// distinctUntilChanged: Ignore if same as previous value
of(1, 1, 2, 2, 3, 3).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3

Combination Operators

import { combineLatest, forkJoin, merge, zip, concat } from 'rxjs';

// forkJoin: Returns array of last values when all complete (similar to Promise.all)
forkJoin([
this.http.get('/api/users'),
this.http.get('/api/products'),
this.http.get('/api/orders')
]).subscribe(([users, products, orders]) => {
// Runs after all requests complete
this.initDashboard(users, products, orders);
});

// combineLatest: Emits with latest values from all when any changes
combineLatest([
this.filter$,
this.sort$,
this.page$
]).pipe(
debounceTime(100),
switchMap(([filter, sort, page]) =>
this.http.get('/api/products', { params: { filter, sort, page } })
)
).subscribe(products => this.products.set(products));

// merge: Combine multiple Observables into one (order irrelevant)
merge(
fromEvent(document, 'click'),
fromEvent(document, 'keypress')
).subscribe(event => console.log('Event:', event.type));

// zip: Pair values at the same index from each Observable
zip(
of('Alice', 'Bob', 'Charlie'),
of(30, 25, 35)
).subscribe(([name, age]) => console.log(`${name}: ${age} years old`));

AsyncPipe

AsyncPipe lets you subscribe to an Observable directly in the template and automatically unsubscribes when the component is destroyed.

import { Component, inject } from '@angular/core';
import { AsyncPipe } from '@angular/common';
import { Observable } from 'rxjs';

@Component({
standalone: true,
imports: [AsyncPipe],
template: `
<!-- async pipe: auto subscribe/unsubscribe Observable -->
@if (users$ | async; as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
} @else {
<p>Loading...</p>
}

<!-- Simple value display -->
<p>Current time: {{ currentTime$ | async | date:'HH:mm:ss' }}</p>
`
})
export class UserListComponent {
private userService = inject(UserService);

users$: Observable<User[]> = this.userService.getUsers();
currentTime$ = interval(1000).pipe(map(() => new Date()));
}

takeUntilDestroyed (Angular 16+)

Automatically unsubscribes when the component is destroyed.

import { Component, OnInit, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef } from '@angular/core';

@Component({ ... })
export class ModernComponent implements OnInit {
private destroyRef = inject(DestroyRef);

ngOnInit() {
interval(1000).pipe(
takeUntilDestroyed(this.destroyRef) // Auto-unsubscribe when component destroyed
).subscribe(n => console.log(n));
}
}

// Or use directly in injection context
@Component({ ... })
export class ModernComponent2 {
// Use directly in field initializer (no destroyRef needed)
private count$ = interval(1000).pipe(
takeUntilDestroyed() // Auto-injects DestroyRef in injection context
);
}

Real-World Example — Search Autocomplete

// search/search.component.ts
import { Component, OnInit, inject, signal } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import { AsyncPipe } from '@angular/common';
import {
debounceTime, distinctUntilChanged, switchMap,
catchError, startWith, map
} from 'rxjs/operators';
import { Observable, of } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

interface SearchResult {
id: number;
title: string;
category: string;
}

@Component({
selector: 'app-search',
standalone: true,
imports: [ReactiveFormsModule, AsyncPipe],
template: `
<div class="search-container">
<input
[formControl]="searchControl"
placeholder="Search products..."
class="search-input"
autocomplete="off"
>

<!-- Loading indicator -->
@if (isSearching()) {
<span class="spinner">Searching...</span>
}

<!-- Results dropdown -->
@if (searchResults$ | async; as results) {
@if (results.length > 0 && searchControl.value) {
<ul class="results-dropdown">
@for (result of results; track result.id) {
<li (click)="selectResult(result)" class="result-item">
<strong>{{ result.title }}</strong>
<small>{{ result.category }}</small>
</li>
}
</ul>
} @else if (searchControl.value && searchControl.value.length > 1) {
<p class="no-results">No results found.</p>
}
}

<!-- Selected result -->
@if (selectedResult()) {
<div class="selected">
Selected: <strong>{{ selectedResult()!.title }}</strong>
<button (click)="clearSelection()">✕</button>
</div>
}
</div>
`
})
export class SearchComponent {
private http = inject(HttpClient);

searchControl = new FormControl('');
selectedResult = signal<SearchResult | null>(null);
isSearching = signal(false);

searchResults$: Observable<SearchResult[]> = this.searchControl.valueChanges.pipe(
startWith(''),

// 300ms debounce: no requests while typing
debounceTime(300),

// Skip if same as previous value
distinctUntilChanged(),

// Handle empty search term
switchMap(term => {
if (!term || term.length < 2) {
this.isSearching.set(false);
return of([]);
}

this.isSearching.set(true);

// Cancel previous request and start new one
return this.http.get<SearchResult[]>(
`https://fakestoreapi.com/products`
).pipe(
map(products =>
// Client-side filtering (normally done server-side)
(products as any[])
.filter((p: any) => p.title.toLowerCase().includes(term.toLowerCase()))
.slice(0, 5)
.map((p: any) => ({
id: p.id,
title: p.title,
category: p.category
}))
),
catchError(() => {
this.isSearching.set(false);
return of([]);
})
);
}),

// Auto-unsubscribe when component is destroyed
takeUntilDestroyed()
);

selectResult(result: SearchResult) {
this.selectedResult.set(result);
this.searchControl.setValue(result.title, { emitEvent: false });
}

clearSelection() {
this.selectedResult.set(null);
this.searchControl.setValue('');
}
}

Error Handling

import { catchError, retry, retryWhen, delay } from 'rxjs/operators';
import { throwError, of } from 'rxjs';

// catchError: Handle error and return fallback value
this.http.get('/api/products').pipe(
catchError(err => {
console.error('API error:', err);

// Return fallback data
return of([]);

// Or re-throw
// return throwError(() => new Error('Failed to load data'));
})
).subscribe(products => this.products.set(products));

// retry: Retry N times on failure
this.http.get('/api/data').pipe(
retry(3) // Retry up to 3 times
).subscribe();

// Exponential backoff retry
import { timer } from 'rxjs';
import { retryWhen, delayWhen, scan } from 'rxjs/operators';

this.http.get('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) throw error;
return retryCount + 1;
}, 0),
delayWhen(retryCount => timer(retryCount * 1000))
)
)
).subscribe();

Pro Tips

Tip 1: Subscription Management Patterns

// Pattern 1: takeUntilDestroyed (Angular 16+, recommended)
someObservable$.pipe(
takeUntilDestroyed()
).subscribe(value => this.data.set(value));

// Pattern 2: AsyncPipe (auto subscribe/unsubscribe)
// In template: {{ data$ | async }}

// Pattern 3: Manual management (legacy)
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(obs$.subscribe(...));
}
ngOnDestroy() {
this.subscription.unsubscribe();
}

Tip 2: Event Bus with Subject

@Injectable({ providedIn: 'root' })
export class EventBusService {
private events$ = new Subject<{ type: string; payload: any }>();

emit(type: string, payload: any) {
this.events$.next({ type, payload });
}

on(type: string) {
return this.events$.pipe(
filter(event => event.type === type),
map(event => event.payload)
);
}
}

Tip 3: The $ Naming Convention

Appending $ to Observable variable names is a standard Angular community convention: users$, currentUser$, searchResults$

Advertisement