비동기 이터레이터
비동기 이터레이터는 데이터를 한 번에 모두 가져오지 않고, 스트리밍 방식으로 순차 처리할 때 사용합니다. 대용량 데이터, API 페이지네이션, 실시간 스트림에 특히 유용합니다.
비동기 이터러블 프로토콜
비동기 이터러블은 Symbol.asyncIterator 메서드를 구현한 객체입니다.
// 비동기 이터러블 프로토콜 구조
const asyncIterable = {
[Symbol.asyncIterator]() {
let i = 0;
return {
async next() {
if (i < 3) {
await delay(100); // 비동기 작업 시뮬레이션
return { value: i++, done: false };
}
return { value: undefined, done: true };
}
};
}
};
// 사용: for await...of
for await (const value of asyncIterable) {
console.log(value); // 0, 1, 2 (각각 100ms 간격)
}
for await...of 문법
// 기본 사용
async function processAsyncData() {
const asyncItems = getAsyncIterable();
for await (const item of asyncItems) {
await processItem(item);
console.log('처리됨:', item);
}
}
// Promise 배열에도 사용 가능
async function processPromises() {
const promises = [
fetch('/api/a').then(r => r.json()),
fetch('/api/b').then(r => r.json()),
fetch('/api/c').then(r => r.json())
];
for await (const result of promises) {
console.log(result); // 순서대로 처리
}
}
// break/continue 지원
async function findFirst() {
for await (const item of getLargeDataStream()) {
if (item.score > 90) {
console.log('찾았습니다:', item);
break; // 나머지 스트림 처리 중단
}
if (item.deleted) continue; // 삭제된 항목 건너뜀
await processItem(item);
}
}
비동기 제너레이터: async function*
// 비동기 제너레이터 기본
async function* asyncCounter(start, end) {
for (let i = start; i <= end; i++) {
await delay(100);
yield i;
}
}
// 사용
for await (const num of asyncCounter(1, 5)) {
console.log(num); // 1, 2, 3, 4, 5 (100ms 간격)
}
// API 페이지네이션 제너레이터
async function* fetchAllPages(baseUrl) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(`${baseUrl}?page=${page}&limit=100`);
const { data, pagination } = await response.json();
for (const item of data) {
yield item; // 한 항목씩 yield
}
hasMore = pagination.page < pagination.totalPages;
page++;
}
}
// 사용: 모든 사용자를 페이지 상관없이 처리
for await (const user of fetchAllPages('/api/users')) {
await processUser(user);
}
스트리밍 데이터 처리 예제
// 대용량 CSV 파일 스트리밍 파싱
async function* parseCSVStream(readableStream) {
const reader = readableStream.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
// 남은 버퍼 처리
if (buffer.trim()) yield parseCSVLine(buffer);
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? ''; // 마지막 불완전한 줄은 버퍼에 유지
for (const line of lines) {
if (line.trim()) yield parseCSVLine(line);
}
}
} finally {
reader.releaseLock();
}
}
function parseCSVLine(line) {
return line.split(',').map(cell => cell.trim());
}
// 사용
const response = await fetch('/large-data.csv');
let rowCount = 0;
for await (const row of parseCSVStream(response.body)) {
rowCount++;
if (rowCount % 1000 === 0) {
console.log(`${rowCount}행 처리 완료`);
}
await processRow(row);
}
ReadableStream API와 연결
// fetch Response는 ReadableStream 제공
async function streamJsonLines(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const lines = text.split('\n').filter(Boolean);
for (const line of lines) {
yield JSON.parse(line);
}
}
} finally {
reader.releaseLock();
}
}
// LLM API 스트리밍 응답 처리 (ChatGPT 스타일)
async function* streamChat(messages) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) yield content;
} catch {
// JSON 파싱 에러 무시
}
}
}
}
} finally {
reader.releaseLock();
}
}
// UI에서 실시간으로 텍스트 업데이트
async function displayStreamingResponse(messages) {
const outputElement = document.getElementById('output');
outputElement.textContent = '';
for await (const chunk of streamChat(messages)) {
outputElement.textContent += chunk;
}
}
실전 예제 1: 페이지네이션 자동 처리
// 범용 페이지네이션 이터레이터
async function* paginate(fetchPage) {
let cursor = null;
while (true) {
const { items, nextCursor } = await fetchPage(cursor);
for (const item of items) {
yield item;
}
if (!nextCursor) break;
cursor = nextCursor;
}
}
// GitHub API 페이지네이션 예시
async function* getAllGitHubRepos(org) {
yield* paginate(async (cursor) => {
const url = cursor
? `/api/github/orgs/${org}/repos?after=${cursor}`
: `/api/github/orgs/${org}/repos`;
const response = await fetch(url);
const data = await response.json();
return {
items: data.repositories,
nextCursor: data.pageInfo.hasNextPage ? data.pageInfo.endCursor : null
};
});
}
// 전체 레포지토리 처리
let count = 0;
for await (const repo of getAllGitHubRepos('microsoft')) {
count++;
if (repo.stars > 10000) {
console.log(`인기 레포: ${repo.name} (⭐ ${repo.stars})`);
}
}
console.log(`총 ${count}개 레포지토리 처리 완료`);
// 배열로 수집
async function collectAll(asyncIterable, limit = Infinity) {
const results = [];
for await (const item of asyncIterable) {
results.push(item);
if (results.length >= limit) break;
}
return results;
}
const top100Repos = await collectAll(getAllGitHubRepos('microsoft'), 100);
실전 예제 2: Node.js 파일 스트림 읽기
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
// 대용량 로그 파일 한 줄씩 처리
async function* readLines(filePath) {
const fileStream = createReadStream(filePath, { encoding: 'utf-8' });
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
yield line;
}
}
// 에러 로그에서 패턴 찾기
async function findErrors(logFile) {
const errors = [];
for await (const line of readLines(logFile)) {
if (line.includes('[ERROR]')) {
errors.push({
timestamp: extractTimestamp(line),
message: extractMessage(line)
});
}
}
return errors;
}
// 통계 계산
async function calculateLogStats(logFile) {
const stats = { errors: 0, warnings: 0, info: 0, total: 0 };
for await (const line of readLines(logFile)) {
stats.total++;
if (line.includes('[ERROR]')) stats.errors++;
else if (line.includes('[WARN]')) stats.warnings++;
else if (line.includes('[INFO]')) stats.info++;
}
return stats;
}
// 메모리 효율적인 파일 처리
async function processLargeFile(inputPath, outputPath) {
const { createWriteStream } = await import('node:fs');
const output = createWriteStream(outputPath);
for await (const line of readLines(inputPath)) {
const processed = transformLine(line);
output.write(processed + '\n');
// 주기적으로 메모리 확인
if (process.memoryUsage().heapUsed > 500 * 1024 * 1024) {
await new Promise(resolve => output.once('drain', resolve));
}
}
await new Promise((resolve, reject) => {
output.end(err => err ? reject(err) : resolve());
});
}
비동기 이터레이터 변환 유틸리티
// map: 각 항목 변환
async function* asyncMap(iterable, fn) {
for await (const item of iterable) {
yield await fn(item);
}
}
// filter: 조건에 맞는 항목만
async function* asyncFilter(iterable, predicate) {
for await (const item of iterable) {
if (await predicate(item)) yield item;
}
}
// take: 처음 N개만
async function* asyncTake(iterable, n) {
let count = 0;
for await (const item of iterable) {
yield item;
if (++count >= n) break;
}
}
// flatMap: 각 항목을 이터러블로 변환 후 평탄화
async function* asyncFlatMap(iterable, fn) {
for await (const item of iterable) {
const result = await fn(item);
for await (const subItem of result) {
yield subItem;
}
}
}
// 체이닝 사용 예시
const pipeline = asyncFilter(
asyncMap(
fetchAllPages('/api/products'),
async (product) => ({
...product,
price: await getDiscountedPrice(product.id)
})
),
(product) => product.price < 50000
);
for await (const affordableProduct of pipeline) {
displayProduct(affordableProduct);
}
에러 처리
// 비동기 이터레이터에서 에러 처리
async function* resilientStream(source) {
try {
for await (const item of source) {
yield item;
}
} catch (err) {
console.error('스트림 에러:', err);
// 에러 후 기본값 yield 가능
yield { error: err.message, timestamp: Date.now() };
} finally {
console.log('스트림 종료');
// 리소스 정리
}
}
// return 메서드: 조기 종료 처리
async function* withCleanup(resource) {
try {
yield* processResource(resource);
} finally {
await resource.close(); // for await...of break 시에도 실행됨
}
}
async function earlyExit() {
const resource = await openResource();
for await (const item of withCleanup(resource)) {
if (shouldStop(item)) break; // finally 블록이 실행되어 resource.close() 호출
process(item);
}
}
고수 팁
1. 비동기 이터레이터 vs Promise.all 선택
// Promise.all: 모든 결과를 한번에 필요할 때
const allResults = await Promise.all(items.map(processItem));
// 비동기 이터레이터: 결과를 즉시 처리하거나 메모리 절약이 필요할 때
for await (const result of processItemsStream(items)) {
displayResult(result); // 완료되는 즉시 표시
}
2. 비동기 이터레이터를 Observable처럼 사용
// WebSocket 메시지를 비동기 이터레이터로
async function* websocketMessages(url) {
const ws = new WebSocket(url);
const messageQueue = [];
let resolve;
let done = false;
ws.onmessage = (event) => {
if (resolve) {
resolve({ value: event.data, done: false });
resolve = null;
} else {
messageQueue.push(event.data);
}
};
ws.onclose = () => {
done = true;
if (resolve) resolve({ value: undefined, done: true });
};
while (!done) {
if (messageQueue.length > 0) {
yield messageQueue.shift();
} else {
const result = await new Promise(r => { resolve = r; });
if (!result.done) yield result.value;
}
}
}
// 사용
for await (const message of websocketMessages('wss://api.example.com')) {
console.log('수신:', message);
}
3. Node.js 스트림과 비동기 이터레이터
// Node.js 스트림은 자동으로 비동기 이터러블
import { createReadStream } from 'node:fs';
const stream = createReadStream('file.txt', { encoding: 'utf-8' });
// for await...of로 바로 사용 가능
for await (const chunk of stream) {
process.stdout.write(chunk);
}