16.6 RxJS & Reactive Programming — Observable, Subject, Key Operators
What Is RxJS?
RxJS (Reactive Extensions for JavaScript) is a library for working with asynchronous data streams. It implements the reactive programming paradigm — "treating everything as a stream" — in JavaScript.
Angular uses RxJS in its core features: HttpClient, Router, Forms, and more.
Reactive Programming Concept
Traditional approach:
Request data → Wait → Response → Process
Reactive approach:
Subscribe to data stream → Automatically process data as it arrives
(Click events, HTTP responses, timers, user input — all unified as streams)
Observable vs Promise
| Aspect | Observable | Promise |
|---|---|---|
| Data | Multiple values (stream) | Single value |
| Execution | Lazy (starts on subscribe) | Eager (starts immediately) |
| Cancellation | Possible (unsubscribe) | Not possible |
| Operators | 100+ rich operators | then/catch/finally |
| Sync support | Yes | No |
| Use case | Events, HTTP, realtime | One-time async |
import { Observable, of, from } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// Create an Observable directly
const observable$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// Subscribe (starts execution)
const subscription = observable$.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completed!')
});
// Unsubscribe (prevent memory leaks)
subscription.unsubscribe();
Subject Types
A Subject is both an Observable and an Observer. It multicasts values to multiple subscribers.
Subject — Basic
import { Subject } from 'rxjs';
const subject$ = new Subject<string>();
// Two subscribers
subject$.subscribe(v => console.log('Subscriber 1:', v));
subject$.subscribe(v => console.log('Subscriber 2:', v));
subject$.next('Hello'); // Both subscribers receive it
subject$.next('World');
// Subscriber 1: Hello / Subscriber 2: Hello
// Subscriber 1: World / Subscriber 2: World
// Subscribers added after a value is emitted don't receive past values
subject$.subscribe(v => console.log('Subscriber 3:', v)); // Receives only future values
BehaviorSubject — Holds Current Value
import { BehaviorSubject } from 'rxjs';
// Initial value required
const count$ = new BehaviorSubject<number>(0);
// Even late subscribers immediately receive the current value (0)
count$.subscribe(v => console.log('Current count:', v)); // Immediately prints 0
count$.next(1); // → 1
count$.next(2); // → 2
// Read current value synchronously
console.log(count$.getValue()); // 2
// Commonly used for state management in services
@Injectable({ providedIn: 'root' })
export class CartService {
private cartItems$ = new BehaviorSubject<CartItem[]>([]);
// Expose as read-only Observable
readonly items$ = this.cartItems$.asObservable();
addItem(item: CartItem) {
const current = this.cartItems$.getValue();
this.cartItems$.next([...current, item]);
}
}
ReplaySubject — Replay Last N Values
import { ReplaySubject } from 'rxjs';
// Buffer the last 3 values
const replay$ = new ReplaySubject<number>(3);
replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);
// Late subscribers receive the last 3 values (2, 3, 4)
replay$.subscribe(v => console.log(v)); // 2, 3, 4
Observable Creation Operators
import { of, from, interval, timer, fromEvent, EMPTY, NEVER } from 'rxjs';
// of: Observable from synchronous values
of(1, 2, 3).subscribe(console.log); // 1, 2, 3
// from: Array, Promise, Iterable → Observable
from([1, 2, 3]).subscribe(console.log);
from(fetch('/api/users')).subscribe(console.log);
from(Promise.resolve('hello')).subscribe(console.log);
// interval: Emit values periodically (0, 1, 2, ...)
interval(1000).subscribe(n => console.log(`${n} seconds elapsed`));
// timer: Emit once after delay, or periodically after delay
timer(3000).subscribe(() => console.log('Runs after 3 seconds'));
timer(0, 1000).subscribe(n => console.log(`${n}s`)); // Starts immediately, every second
// fromEvent: Convert DOM events to Observable
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(event => console.log('Clicked!', event));
// EMPTY: Completes immediately
EMPTY.subscribe({ complete: () => console.log('Completed') });
// NEVER: Emits nothing and never completes
Key Transformation Operators
map — Transform
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(n => n * 2)
).subscribe(console.log); // 2, 4, 6
// Object transformation
this.http.get<ApiResponse>('/api/users').pipe(
map(response => response.data),
map(users => users.filter(u => u.active))
).subscribe(users => this.users.set(users));
switchMap — Switch to Latest Inner Observable (great for search)
import { switchMap, debounceTime } from 'rxjs/operators';
searchControl.valueChanges.pipe(
debounceTime(300),
switchMap(term =>
// Cancels previous request and switches to the new one
this.http.get<Product[]>(`/api/products?q=${term}`)
)
).subscribe(products => this.results.set(products));
mergeMap — Process Multiple Inner Observables Concurrently
import { mergeMap } from 'rxjs/operators';
// Upload multiple files simultaneously
from(selectedFiles).pipe(
mergeMap(file => this.uploadService.upload(file))
).subscribe(result => console.log('Upload complete:', result.name));
concatMap — Process in Order (wait for completion)
import { concatMap } from 'rxjs/operators';
// Guarantee request order (start next after previous completes)
from(orderedRequests).pipe(
concatMap(req => this.http.post('/api/process', req))
).subscribe(result => console.log('Processed:', result));
exhaustMap — Ignore New Requests While In Progress (prevent duplicate submit)
import { exhaustMap } from 'rxjs/operators';
// Button click — ignore additional clicks while processing
fromEvent(submitButton, 'click').pipe(
exhaustMap(() => this.http.post('/api/order', orderData))
).subscribe(result => console.log('Order complete:', result));
Comparison of flatMap Variants
| Operator | Behavior | Use Case |
|---|---|---|
switchMap | Cancel previous, keep latest | Real-time search, autocomplete |
mergeMap | Concurrent execution, order irrelevant | Parallel file uploads |
concatMap | Sequential, one at a time | Order-sensitive operations |
exhaustMap | Ignore if in progress | Prevent duplicate form submissions |
Filtering Operators
import {
filter, take, takeUntil, takeWhile,
debounceTime, throttleTime, distinctUntilChanged,
first, last, skip
} from 'rxjs/operators';
// filter: Only pass values that meet the condition
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0)
).subscribe(console.log); // 2, 4
// take: Receive N values then complete
interval(1000).pipe(
take(5)
).subscribe(console.log); // 0, 1, 2, 3, 4 then complete
// takeUntil: Run until another Observable emits
const stop$ = new Subject<void>();
interval(1000).pipe(
takeUntil(stop$)
).subscribe(n => console.log(n));
setTimeout(() => stop$.next(), 5000); // Stop after 5 seconds
// debounceTime: Wait N ms after last input (great for search)
searchInput.pipe(
debounceTime(300),
distinctUntilChanged() // Ignore consecutive duplicate values
).subscribe(term => this.search(term));
// distinctUntilChanged: Ignore if same as previous value
of(1, 1, 2, 2, 3, 3).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3
Combination Operators
import { combineLatest, forkJoin, merge, zip, concat } from 'rxjs';
// forkJoin: Returns array of last values when all complete (similar to Promise.all)
forkJoin([
this.http.get('/api/users'),
this.http.get('/api/products'),
this.http.get('/api/orders')
]).subscribe(([users, products, orders]) => {
// Runs after all requests complete
this.initDashboard(users, products, orders);
});
// combineLatest: Emits with latest values from all when any changes
combineLatest([
this.filter$,
this.sort$,
this.page$
]).pipe(
debounceTime(100),
switchMap(([filter, sort, page]) =>
this.http.get('/api/products', { params: { filter, sort, page } })
)
).subscribe(products => this.products.set(products));
// merge: Combine multiple Observables into one (order irrelevant)
merge(
fromEvent(document, 'click'),
fromEvent(document, 'keypress')
).subscribe(event => console.log('Event:', event.type));
// zip: Pair values at the same index from each Observable
zip(
of('Alice', 'Bob', 'Charlie'),
of(30, 25, 35)
).subscribe(([name, age]) => console.log(`${name}: ${age} years old`));
AsyncPipe
AsyncPipe lets you subscribe to an Observable directly in the template and automatically unsubscribes when the component is destroyed.
import { Component, inject } from '@angular/core';
import { AsyncPipe } from '@angular/common';
import { Observable } from 'rxjs';
@Component({
standalone: true,
imports: [AsyncPipe],
template: `
<!-- async pipe: auto subscribe/unsubscribe Observable -->
@if (users$ | async; as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
} @else {
<p>Loading...</p>
}
<!-- Simple value display -->
<p>Current time: {{ currentTime$ | async | date:'HH:mm:ss' }}</p>
`
})
export class UserListComponent {
private userService = inject(UserService);
users$: Observable<User[]> = this.userService.getUsers();
currentTime$ = interval(1000).pipe(map(() => new Date()));
}
takeUntilDestroyed (Angular 16+)
Automatically unsubscribes when the component is destroyed.
import { Component, OnInit, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef } from '@angular/core';
@Component({ ... })
export class ModernComponent implements OnInit {
private destroyRef = inject(DestroyRef);
ngOnInit() {
interval(1000).pipe(
takeUntilDestroyed(this.destroyRef) // Auto-unsubscribe when component destroyed
).subscribe(n => console.log(n));
}
}
// Or use directly in injection context
@Component({ ... })
export class ModernComponent2 {
// Use directly in field initializer (no destroyRef needed)
private count$ = interval(1000).pipe(
takeUntilDestroyed() // Auto-injects DestroyRef in injection context
);
}
Real-World Example — Search Autocomplete
// search/search.component.ts
import { Component, OnInit, inject, signal } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import { AsyncPipe } from '@angular/common';
import {
debounceTime, distinctUntilChanged, switchMap,
catchError, startWith, map
} from 'rxjs/operators';
import { Observable, of } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
interface SearchResult {
id: number;
title: string;
category: string;
}
@Component({
selector: 'app-search',
standalone: true,
imports: [ReactiveFormsModule, AsyncPipe],
template: `
<div class="search-container">
<input
[formControl]="searchControl"
placeholder="Search products..."
class="search-input"
autocomplete="off"
>
<!-- Loading indicator -->
@if (isSearching()) {
<span class="spinner">Searching...</span>
}
<!-- Results dropdown -->
@if (searchResults$ | async; as results) {
@if (results.length > 0 && searchControl.value) {
<ul class="results-dropdown">
@for (result of results; track result.id) {
<li (click)="selectResult(result)" class="result-item">
<strong>{{ result.title }}</strong>
<small>{{ result.category }}</small>
</li>
}
</ul>
} @else if (searchControl.value && searchControl.value.length > 1) {
<p class="no-results">No results found.</p>
}
}
<!-- Selected result -->
@if (selectedResult()) {
<div class="selected">
Selected: <strong>{{ selectedResult()!.title }}</strong>
<button (click)="clearSelection()">✕</button>
</div>
}
</div>
`
})
export class SearchComponent {
private http = inject(HttpClient);
searchControl = new FormControl('');
selectedResult = signal<SearchResult | null>(null);
isSearching = signal(false);
searchResults$: Observable<SearchResult[]> = this.searchControl.valueChanges.pipe(
startWith(''),
// 300ms debounce: no requests while typing
debounceTime(300),
// Skip if same as previous value
distinctUntilChanged(),
// Handle empty search term
switchMap(term => {
if (!term || term.length < 2) {
this.isSearching.set(false);
return of([]);
}
this.isSearching.set(true);
// Cancel previous request and start new one
return this.http.get<SearchResult[]>(
`https://fakestoreapi.com/products`
).pipe(
map(products =>
// Client-side filtering (normally done server-side)
(products as any[])
.filter((p: any) => p.title.toLowerCase().includes(term.toLowerCase()))
.slice(0, 5)
.map((p: any) => ({
id: p.id,
title: p.title,
category: p.category
}))
),
catchError(() => {
this.isSearching.set(false);
return of([]);
})
);
}),
// Auto-unsubscribe when component is destroyed
takeUntilDestroyed()
);
selectResult(result: SearchResult) {
this.selectedResult.set(result);
this.searchControl.setValue(result.title, { emitEvent: false });
}
clearSelection() {
this.selectedResult.set(null);
this.searchControl.setValue('');
}
}
Error Handling
import { catchError, retry, retryWhen, delay } from 'rxjs/operators';
import { throwError, of } from 'rxjs';
// catchError: Handle error and return fallback value
this.http.get('/api/products').pipe(
catchError(err => {
console.error('API error:', err);
// Return fallback data
return of([]);
// Or re-throw
// return throwError(() => new Error('Failed to load data'));
})
).subscribe(products => this.products.set(products));
// retry: Retry N times on failure
this.http.get('/api/data').pipe(
retry(3) // Retry up to 3 times
).subscribe();
// Exponential backoff retry
import { timer } from 'rxjs';
import { retryWhen, delayWhen, scan } from 'rxjs/operators';
this.http.get('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) throw error;
return retryCount + 1;
}, 0),
delayWhen(retryCount => timer(retryCount * 1000))
)
)
).subscribe();
Pro Tips
Tip 1: Subscription Management Patterns
// Pattern 1: takeUntilDestroyed (Angular 16+, recommended)
someObservable$.pipe(
takeUntilDestroyed()
).subscribe(value => this.data.set(value));
// Pattern 2: AsyncPipe (auto subscribe/unsubscribe)
// In template: {{ data$ | async }}
// Pattern 3: Manual management (legacy)
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(obs$.subscribe(...));
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
Tip 2: Event Bus with Subject
@Injectable({ providedIn: 'root' })
export class EventBusService {
private events$ = new Subject<{ type: string; payload: any }>();
emit(type: string, payload: any) {
this.events$.next({ type, payload });
}
on(type: string) {
return this.events$.pipe(
filter(event => event.type === type),
map(event => event.payload)
);
}
}
Tip 3: The $ Naming Convention
Appending $ to Observable variable names is a standard Angular community convention: users$, currentUser$, searchResults$