16.6 RxJS & 반응형 프로그래밍 — Observable, Subject, 주요 연산자
RxJS란?
**RxJS(Reactive Extensions for JavaScript)**는 비동기 데이터 스트림을 다루는 라이브러리입니다. "모든 것을 스트림으로 보는" 반응형 프로그래밍 패러다임을 JavaScript로 구현합니다.
Angular는 HttpClient, Router, Forms 등 핵심 기능에서 RxJS를 사용합니다.
반응형 프로그래밍 개념
전통적 방식:
데이터 요청 → 기다림 → 응답 → 처리
반응형 방식:
데이터 스트림 구독 → 데이터가 올 때마다 자동으로 처리
(클릭 이벤트, HTTP 응답, 타이머, 사용자 입력 모두 스트림으로 통일)
Observable vs Promise
| 항목 | Observable | Promise |
|---|---|---|
| 데이터 | 여러 값 (스트림) | 단일 값 |
| 실행 | lazy (구독 시 시작) | eager (즉시 시작) |
| 취소 | 가능 (unsubscribe) | 불가능 |
| 연산자 | 100+ 풍부한 연산자 | then/catch/finally |
| 동기 지원 | 가능 | 불가능 |
| 사용처 | 이벤트, HTTP, 실시간 | 일회성 비동기 |
import { Observable, of, from } from 'rxjs';
import { map, filter } from 'rxjs/operators';
// Observable 직접 생성
const observable$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
// 구독 (실행 시작)
const subscription = observable$.subscribe({
next: value => console.log('값:', value),
error: err => console.error('오류:', err),
complete: () => console.log('완료!')
});
// 구독 해제 (메모리 누수 방지)
subscription.unsubscribe();
Subject 종류
Subject는 Observable이면서 Observer이기도 합니다. 여러 구독자에게 값을 멀티캐스트합니다.
Subject — 기본
import { Subject } from 'rxjs';
const subject$ = new Subject<string>();
// 두 구독자
subject$.subscribe(v => console.log('구독자1:', v));
subject$.subscribe(v => console.log('구독자2:', v));
subject$.next('안녕'); // 두 구독자 모두 받음
subject$.next('세상아');
// 구독자1: 안녕 / 구독자2: 안녕
// 구독자1: 세상아 / 구독자2: 세상아
// 구독 후에는 이전 값 받지 못함
subject$.subscribe(v => console.log('구독자3:', v)); // 이후 값만 받음
BehaviorSubject — 현재 값 보유
import { BehaviorSubject } from 'rxjs';
// 초기값 필수
const count$ = new BehaviorSubject<number>(0);
// 나중에 구독해도 현재 값(0)을 즉시 받음
count$.subscribe(v => console.log('현재 카운트:', v)); // 즉시 0 출력
count$.next(1); // → 1
count$.next(2); // → 2
// 현재 값 동기적으로 읽기
console.log(count$.getValue()); // 2
// 서비스에서 상태 관리에 자주 사용
@Injectable({ providedIn: 'root' })
export class CartService {
private cartItems$ = new BehaviorSubject<CartItem[]>([]);
// 읽기 전용 Observable로 노출
readonly items$ = this.cartItems$.asObservable();
addItem(item: CartItem) {
const current = this.cartItems$.getValue();
this.cartItems$.next([...current, item]);
}
}
ReplaySubject — 과거 N개 값 재생
import { ReplaySubject } from 'rxjs';
// 최근 3개 값을 버퍼링
const replay$ = new ReplaySubject<number>(3);
replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);
// 나중에 구독해도 최근 3개(2, 3, 4)를 받음
replay$.subscribe(v => console.log(v)); // 2, 3, 4
Observable 생성 연산자
import { of, from, interval, timer, fromEvent, EMPTY, NEVER } from 'rxjs';
// of: 동기 값들의 Observable
of(1, 2, 3).subscribe(console.log); // 1, 2, 3
// from: 배열, Promise, Iterable → Observable
from([1, 2, 3]).subscribe(console.log);
from(fetch('/api/users')).subscribe(console.log);
from(Promise.resolve('hello')).subscribe(console.log);
// interval: 주기적으로 값 방출 (0, 1, 2, ...)
interval(1000).subscribe(n => console.log(`${n}초 경과`));
// timer: 지연 후 한 번, 또는 지연 후 주기적으로
timer(3000).subscribe(() => console.log('3초 후 실행'));
timer(0, 1000).subscribe(n => console.log(`${n}초`)); // 즉시 시작, 1초마다
// fromEvent: DOM 이벤트를 Observable로
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(event => console.log('클릭!', event));
// EMPTY: 즉시 완료
EMPTY.subscribe({ complete: () => console.log('완료') });
// NEVER: 아무것도 방출하지 않고 완료도 안 함
주요 변환 연산자
map — 변환
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(n => n * 2)
).subscribe(console.log); // 2, 4, 6
// 객체 변환
this.http.get<ApiResponse>('/api/users').pipe(
map(response => response.data),
map(users => users.filter(u => u.active))
).subscribe(users => this.users.set(users));
switchMap — 최신 내부 Observable로 전환 (검색에 적합)
import { switchMap, debounceTime } from 'rxjs/operators';
searchControl.valueChanges.pipe(
debounceTime(300),
switchMap(term =>
// 이전 요청을 취소하고 새 요청으로 전환
this.http.get<Product[]>(`/api/products?q=${term}`)
)
).subscribe(products => this.results.set(products));
mergeMap — 동시에 여러 내부 Observable 처리
import { mergeMap } from 'rxjs/operators';
// 여러 파일을 동시에 업로드
from(selectedFiles).pipe(
mergeMap(file => this.uploadService.upload(file))
).subscribe(result => console.log('업로드 완료:', result.name));
concatMap — 순서대로 처리 (완료 후 다음)
import { concatMap } from 'rxjs/operators';
// 요청 순서 보장 (이전 완료 후 다음 시작)
from(orderedRequests).pipe(
concatMap(req => this.http.post('/api/process', req))
).subscribe(result => console.log('처리 완료:', result));
exhaustMap — 진행 중이면 새 요청 무시 (중복 제출 방지)
import { exhaustMap } from 'rxjs/operators';
// 버튼 클릭 — 처리 중이면 추가 클릭 무시
fromEvent(submitButton, 'click').pipe(
exhaustMap(() => this.http.post('/api/order', orderData))
).subscribe(result => console.log('주문 완료:', result));
flatMap 계열 비교
| 연산자 | 동작 | 사용 사례 |
|---|---|---|
switchMap | 이전 취소, 최신만 | 실시간 검색, 자동완성 |
mergeMap | 동시 실행, 순서 무관 | 파일 병렬 업로드 |
concatMap | 순서대로 하나씩 | 순서 중요한 작업 |
exhaustMap | 진행 중이면 무시 | 중복 폼 제출 방지 |
필터링 연산자
import {
filter, take, takeUntil, takeWhile,
debounceTime, throttleTime, distinctUntilChanged,
first, last, skip
} from 'rxjs/operators';
// filter: 조건을 만족하는 값만 통과
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0)
).subscribe(console.log); // 2, 4
// take: N개만 받고 완료
interval(1000).pipe(
take(5)
).subscribe(console.log); // 0, 1, 2, 3, 4 후 완료
// takeUntil: 다른 Observable이 방출할 때까지
const stop$ = new Subject<void>();
interval(1000).pipe(
takeUntil(stop$)
).subscribe(n => console.log(n));
setTimeout(() => stop$.next(), 5000); // 5초 후 중단
// debounceTime: 마지막 입력 후 N ms 대기 (검색에 적합)
searchInput.pipe(
debounceTime(300),
distinctUntilChanged() // 동일한 값 연속 방출 무시
).subscribe(term => this.search(term));
// distinctUntilChanged: 이전 값과 같으면 무시
of(1, 1, 2, 2, 3, 3).pipe(
distinctUntilChanged()
).subscribe(console.log); // 1, 2, 3
결합 연산자
import { combineLatest, forkJoin, merge, zip, concat } from 'rxjs';
// forkJoin: 모두 완료 시 마지막 값 배열로 반환 (Promise.all과 유사)
forkJoin([
this.http.get('/api/users'),
this.http.get('/api/products'),
this.http.get('/api/orders')
]).subscribe(([users, products, orders]) => {
// 모든 요청 완료 후 실행
this.initDashboard(users, products, orders);
});
// combineLatest: 하나라도 변경되면 모두의 최신 값으로 방출
combineLatest([
this.filter$,
this.sort$,
this.page$
]).pipe(
debounceTime(100),
switchMap(([filter, sort, page]) =>
this.http.get('/api/products', { params: { filter, sort, page } })
)
).subscribe(products => this.products.set(products));
// merge: 여러 Observable을 하나로 합침 (순서 무관)
merge(
fromEvent(document, 'click'),
fromEvent(document, 'keypress')
).subscribe(event => console.log('이벤트:', event.type));
// zip: 각 Observable의 같은 인덱스 값을 묶어서 방출
zip(
of('Alice', 'Bob', 'Charlie'),
of(30, 25, 35)
).subscribe(([name, age]) => console.log(`${name}: ${age}세`));
AsyncPipe
AsyncPipe를 사용하면 Observable을 템플릿에서 직접 구독하고, 컴포넌트 소멸 시 자동으로 구독 해제합니다.
import { Component, inject } from '@angular/core';
import { AsyncPipe } from '@angular/common';
import { Observable } from 'rxjs';
@Component({
standalone: true,
imports: [AsyncPipe],
template: `
<!-- async 파이프: Observable 자동 구독/해제 -->
@if (users$ | async; as users) {
<ul>
@for (user of users; track user.id) {
<li>{{ user.name }}</li>
}
</ul>
} @else {
<p>로딩 중...</p>
}
<!-- 단순한 값 표시 -->
<p>현재 시간: {{ currentTime$ | async | date:'HH:mm:ss' }}</p>
`
})
export class UserListComponent {
private userService = inject(UserService);
users$: Observable<User[]> = this.userService.getUsers();
currentTime$ = interval(1000).pipe(map(() => new Date()));
}
takeUntilDestroyed (Angular 16+)
컴포넌트 소멸 시 자동으로 구독을 해제합니다.
import { Component, OnInit, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef } from '@angular/core';
@Component({ ... })
export class ModernComponent implements OnInit {
private destroyRef = inject(DestroyRef);
ngOnInit() {
interval(1000).pipe(
takeUntilDestroyed(this.destroyRef) // 컴포넌트 소멸 시 자동 해제
).subscribe(n => console.log(n));
}
}
// 또는 injection context에서 직접 사용
@Component({ ... })
export class ModernComponent2 {
// 생성자/필드 초기화에서 직접 사용 (destroyRef 불필요)
private count$ = interval(1000).pipe(
takeUntilDestroyed() // injection context에서 자동으로 DestroyRef 주입
);
}
실전 예제 — 검색 자동완성
// search/search.component.ts
import { Component, OnInit, inject, signal } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { HttpClient } from '@angular/common/http';
import { AsyncPipe } from '@angular/common';
import {
debounceTime, distinctUntilChanged, switchMap,
catchError, startWith, map
} from 'rxjs/operators';
import { Observable, of } from 'rxjs';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
interface SearchResult {
id: number;
title: string;
category: string;
}
@Component({
selector: 'app-search',
standalone: true,
imports: [ReactiveFormsModule, AsyncPipe],
template: `
<div class="search-container">
<input
[formControl]="searchControl"
placeholder="상품 검색..."
class="search-input"
autocomplete="off"
>
<!-- 로딩 표시 -->
@if (isSearching()) {
<span class="spinner">🔍</span>
}
<!-- 검색 결과 드롭다운 -->
@if (searchResults$ | async; as results) {
@if (results.length > 0 && searchControl.value) {
<ul class="results-dropdown">
@for (result of results; track result.id) {
<li (click)="selectResult(result)" class="result-item">
<strong>{{ result.title }}</strong>
<small>{{ result.category }}</small>
</li>
}
</ul>
} @else if (searchControl.value && searchControl.value.length > 1) {
<p class="no-results">검색 결과가 없습니다.</p>
}
}
<!-- 선택된 결과 -->
@if (selectedResult()) {
<div class="selected">
선택됨: <strong>{{ selectedResult()!.title }}</strong>
<button (click)="clearSelection()">✕</button>
</div>
}
</div>
`,
styles: [`
.search-container { position: relative; max-width: 400px; }
.search-input { width: 100%; padding: 0.75rem; font-size: 1rem; }
.results-dropdown {
position: absolute; width: 100%; background: white;
border: 1px solid #ddd; border-radius: 4px;
list-style: none; padding: 0; margin: 0; z-index: 100;
max-height: 300px; overflow-y: auto;
}
.result-item {
padding: 0.75rem; cursor: pointer;
display: flex; justify-content: space-between;
}
.result-item:hover { background: #f0f0f0; }
`]
})
export class SearchComponent {
private http = inject(HttpClient);
searchControl = new FormControl('');
selectedResult = signal<SearchResult | null>(null);
isSearching = signal(false);
searchResults$: Observable<SearchResult[]> = this.searchControl.valueChanges.pipe(
startWith(''),
// 300ms 디바운스: 입력 중에는 요청 안 함
debounceTime(300),
// 이전 값과 같으면 스킵
distinctUntilChanged(),
// 빈 검색어 처리
switchMap(term => {
if (!term || term.length < 2) {
this.isSearching.set(false);
return of([]);
}
this.isSearching.set(true);
// 이전 요청 취소 후 새 요청
return this.http.get<SearchResult[]>(
`https://fakestoreapi.com/products`
).pipe(
map(products =>
// 클라이언트 필터링 (실제는 서버에서)
(products as any[])
.filter((p: any) => p.title.toLowerCase().includes(term.toLowerCase()))
.slice(0, 5)
.map((p: any) => ({
id: p.id,
title: p.title,
category: p.category
}))
),
catchError(() => {
this.isSearching.set(false);
return of([]);
})
);
}),
// 컴포넌트 소멸 시 자동 구독 해제
takeUntilDestroyed()
);
selectResult(result: SearchResult) {
this.selectedResult.set(result);
this.searchControl.setValue(result.title, { emitEvent: false });
}
clearSelection() {
this.selectedResult.set(null);
this.searchControl.setValue('');
}
}
오류 처리
import { catchError, retry, retryWhen, delay } from 'rxjs/operators';
import { throwError, of } from 'rxjs';
// catchError: 오류 처리 후 대체값 반환
this.http.get('/api/products').pipe(
catchError(err => {
console.error('API 오류:', err);
// 대체 데이터 반환
return of([]);
// 또는 오류 재발생
// return throwError(() => new Error('데이터 로드 실패'));
})
).subscribe(products => this.products.set(products));
// retry: 실패 시 N번 재시도
this.http.get('/api/data').pipe(
retry(3) // 최대 3번 재시도
).subscribe();
// 지수 백오프 재시도
import { timer } from 'rxjs';
import { retryWhen, delayWhen, scan } from 'rxjs/operators';
this.http.get('/api/data').pipe(
retryWhen(errors =>
errors.pipe(
scan((retryCount, error) => {
if (retryCount >= 3) throw error;
return retryCount + 1;
}, 0),
delayWhen(retryCount => timer(retryCount * 1000))
)
)
).subscribe();
고수 팁
팁 1: 구독 관리 패턴
// 패턴 1: takeUntilDestroyed (Angular 16+, 권장)
someObservable$.pipe(
takeUntilDestroyed()
).subscribe(value => this.data.set(value));
// 패턴 2: AsyncPipe (구독/해제 자동)
// 템플릿에서: {{ data$ | async }}
// 패턴 3: 직접 관리 (레거시)
private subscription = new Subscription();
ngOnInit() {
this.subscription.add(obs$.subscribe(...));
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
팁 2: Subject로 이벤트 버스 구현
@Injectable({ providedIn: 'root' })
export class EventBusService {
private events$ = new Subject<{ type: string; payload: any }>();
emit(type: string, payload: any) {
this.events$.next({ type, payload });
}
on(type: string) {
return this.events$.pipe(
filter(event => event.type === type),
map(event => event.payload)
);
}
}
팁 3: $ 네이밍 컨벤션
Observable 변수명 끝에 $를 붙이는 것이 Angular 커뮤니티의 관례입니다: users$, currentUser$, searchResults$