본문으로 건너뛰기
Advertisement

Node.js 스트림과 버퍼

스트림이란?

스트림(Stream)은 데이터를 작은 청크(chunk)로 나눠 처리하는 방식입니다. 대용량 파일을 다룰 때 전체를 메모리에 올리지 않고 조금씩 읽어 처리할 수 있어 메모리 효율이 뛰어납니다.

메모리에 전체 로드:                  스트림 방식:
┌───────────────────────┐ 청크1 → 처리
│ 1GB 파일 전체 │ 청크2 → 처리
│ (1GB 메모리 필요) │ 청크3 → 처리
└───────────────────────┘ ...

스트림 4종류

1. Readable (읽기 스트림)

데이터 소스에서 읽어오는 스트림입니다.

const fs = require('fs');
const { Readable } = require('stream');

// 파일에서 읽기
const fileStream = fs.createReadStream('./large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024, // 청크 크기: 64KB (기본값: 16KB)
});

// 이벤트 기반 처리
fileStream.on('data', (chunk) => {
console.log(`청크 수신: ${chunk.length} 바이트`);
});

fileStream.on('end', () => {
console.log('파일 읽기 완료');
});

fileStream.on('error', (err) => {
console.error('읽기 에러:', err.message);
});

// 커스텀 Readable 스트림 생성
class CounterStream extends Readable {
constructor(max) {
super({ objectMode: true }); // 객체도 스트리밍 가능
this.max = max;
this.current = 0;
}

_read() {
if (this.current < this.max) {
this.push(this.current++);
} else {
this.push(null); // 스트림 종료
}
}
}

const counter = new CounterStream(5);
counter.on('data', (num) => console.log('숫자:', num));
counter.on('end', () => console.log('카운터 완료'));

2. Writable (쓰기 스트림)

데이터를 목적지에 쓰는 스트림입니다.

const fs = require('fs');
const { Writable } = require('stream');

// 파일에 쓰기
const writeStream = fs.createWriteStream('./output.txt', {
encoding: 'utf8',
flags: 'w', // 'a' for append
});

writeStream.write('첫 번째 줄\n');
writeStream.write('두 번째 줄\n');
writeStream.end('마지막 줄\n'); // 쓰기 완료

writeStream.on('finish', () => console.log('파일 쓰기 완료'));
writeStream.on('error', (err) => console.error('쓰기 에러:', err.message));

// 커스텀 Writable 스트림
class LogStream extends Writable {
constructor(prefix) {
super({ decodeStrings: false }); // 문자열 그대로 수신
this.prefix = prefix;
this.lineCount = 0;
}

_write(chunk, encoding, callback) {
this.lineCount++;
console.log(`[${this.prefix}] #${this.lineCount}: ${chunk.toString().trim()}`);
callback(); // 다음 청크 처리 준비
}

_final(callback) {
console.log(`[${this.prefix}] 총 ${this.lineCount}줄 처리`);
callback();
}
}

const logger = new LogStream('APP');
logger.write('시작합니다\n');
logger.write('처리 중...\n');
logger.end('완료!\n');

3. Duplex (이중 스트림)

읽기와 쓰기를 모두 지원합니다. 네트워크 소켓이 대표적인 예입니다.

const { Duplex } = require('stream');

class EchoStream extends Duplex {
constructor() {
super();
this.buffer = [];
}

_write(chunk, encoding, callback) {
// 수신한 데이터를 버퍼에 저장
this.buffer.push(chunk);
callback();
}

_read(size) {
// 버퍼에서 데이터 꺼내기
if (this.buffer.length > 0) {
this.push(this.buffer.shift());
}
}
}

const echo = new EchoStream();
echo.write('Hello');
echo.write(' World');
echo.on('data', (chunk) => console.log('Echo:', chunk.toString()));

4. Transform (변환 스트림)

입력을 변환해 출력합니다. gzip 압축, 암호화 등이 여기에 해당합니다.

const { Transform } = require('stream');
const { createGzip, createGunzip } = require('zlib');
const fs = require('fs');

// 내장 Transform: gzip 압축
const gzip = createGzip();
const source = fs.createReadStream('./input.txt');
const dest = fs.createWriteStream('./input.txt.gz');
source.pipe(gzip).pipe(dest);
dest.on('finish', () => console.log('압축 완료'));

// 커스텀 Transform: 대문자 변환
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}

// 커스텀 Transform: CSV 파싱
class CsvParser extends Transform {
constructor() {
super({ objectMode: true }); // 문자열 입력 → 객체 출력
this.headers = null;
this.remainder = '';
}

_transform(chunk, encoding, callback) {
const text = this.remainder + chunk.toString();
const lines = text.split('\n');
this.remainder = lines.pop(); // 마지막 불완전 줄 보관

for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',').map(v => v.trim());

if (!this.headers) {
this.headers = values;
} else {
const obj = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]])
);
this.push(obj);
}
}
callback();
}

_flush(callback) {
// 마지막 남은 데이터 처리
if (this.remainder.trim() && this.headers) {
const values = this.remainder.split(',').map(v => v.trim());
const obj = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]])
);
this.push(obj);
}
callback();
}
}

pipe()로 스트림 연결

const fs = require('fs');
const { createGzip } = require('zlib');
const { createCipheriv, randomBytes } = require('crypto');

// 간단한 파이프
fs.createReadStream('./input.txt')
.pipe(fs.createWriteStream('./output.txt'));

// 체인 파이프: 읽기 → 압축 → 쓰기
fs.createReadStream('./input.txt')
.pipe(createGzip())
.pipe(fs.createWriteStream('./input.txt.gz'));

// 에러 처리가 빠진 pipe의 함정
const src = fs.createReadStream('./input.txt');
const gz = createGzip();
const dest = fs.createWriteStream('./output.gz');

// 에러가 발생하면 pipe는 자동으로 정리하지 않음
src.on('error', cleanup);
gz.on('error', cleanup);
dest.on('error', cleanup);

function cleanup(err) {
console.error('에러:', err.message);
src.destroy();
gz.destroy();
dest.destroy();
}

src.pipe(gz).pipe(dest);
dest.on('finish', () => console.log('완료'));

stream/promises: pipeline()

pipeline()은 pipe보다 안전합니다. 에러 발생 시 모든 스트림을 자동으로 정리합니다.

const { pipeline } = require('stream/promises');
const fs = require('fs');
const { createGzip, createGunzip } = require('zlib');
const { Transform } = require('stream');

// 파일 압축
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
createGzip(),
fs.createWriteStream(output)
);
console.log(`압축 완료: ${input}${output}`);
}

// 파일 압축 해제
async function decompressFile(input, output) {
await pipeline(
fs.createReadStream(input),
createGunzip(),
fs.createWriteStream(output)
);
console.log(`해제 완료: ${input}${output}`);
}

// 대용량 CSV 스트리밍 처리
async function processCsvStream(inputPath, outputPath) {
const lineCounter = new Transform({
objectMode: true,
transform(record, enc, callback) {
// 각 레코드 처리
const processed = {
...record,
name: record.name?.toUpperCase(),
processed_at: new Date().toISOString(),
};
callback(null, JSON.stringify(processed) + '\n');
},
});

await pipeline(
fs.createReadStream(inputPath),
new CsvParser(), // 위에서 정의한 클래스
lineCounter,
fs.createWriteStream(outputPath)
);
}

// 실행
compressFile('./data.txt', './data.txt.gz').catch(console.error);

Buffer vs String 인코딩

const { Buffer } = require('buffer');

// Buffer 생성
const buf1 = Buffer.from('안녕하세요', 'utf8');
const buf2 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);
const buf3 = Buffer.alloc(10); // 0으로 채워진 10바이트
const buf4 = Buffer.allocUnsafe(10); // 초기화하지 않은 버퍼 (빠름, 주의)

// Buffer → 문자열 변환
console.log(buf1.toString('utf8')); // 안녕하세요
console.log(buf2.toString('ascii')); // Hello
console.log(buf1.toString('base64')); // base64 인코딩
console.log(buf1.toString('hex')); // hex 인코딩

// 문자열 → Buffer
const buf5 = Buffer.from('Hello World');
console.log(buf5.length); // 11 (바이트 수, UTF-8 기준)

// 한국어는 UTF-8에서 3바이트
const kor = Buffer.from('한');
console.log(kor.length); // 3

// Buffer 조작
const a = Buffer.from('Hello');
const b = Buffer.from(' World');
const combined = Buffer.concat([a, b]);
console.log(combined.toString()); // Hello World

// Buffer 비교
const x = Buffer.from('ABC');
const y = Buffer.from('ABC');
console.log(x.equals(y)); // true
console.log(x === y); // false (다른 객체)

// 인코딩 감지
const encoded = 'SGVsbG8gV29ybGQ='; // base64
const decoded = Buffer.from(encoded, 'base64').toString('utf8');
console.log(decoded); // Hello World

실전: 대용량 파일 처리

대용량 로그 파일 분석

const fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform, Writable } = require('stream');
const readline = require('readline');

async function analyzeLogFile(logPath) {
const stats = {
total: 0,
errors: 0,
warnings: 0,
info: 0,
ipCounts: new Map(),
};

// readline 인터페이스로 한 줄씩 처리
const rl = readline.createInterface({
input: fs.createReadStream(logPath),
crlfDelay: Infinity,
});

for await (const line of rl) {
stats.total++;

if (line.includes('ERROR')) stats.errors++;
else if (line.includes('WARN')) stats.warnings++;
else if (line.includes('INFO')) stats.info++;

// IP 추출
const ipMatch = line.match(/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/);
if (ipMatch) {
const ip = ipMatch[0];
stats.ipCounts.set(ip, (stats.ipCounts.get(ip) || 0) + 1);
}
}

// 상위 IP 출력
const topIPs = [...stats.ipCounts.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, 5);

return { ...stats, topIPs };
}

analyzeLogFile('./access.log').then(result => {
console.log('로그 분석 결과:');
console.log(` 전체: ${result.total}`);
console.log(` 에러: ${result.errors}`);
console.log(` 경고: ${result.warnings}`);
console.log(' 상위 IP:', result.topIPs);
}).catch(console.error);

파일 청크 분할

const fs = require('fs/promises');
const { createReadStream, createWriteStream } = require('fs');
const { pipeline } = require('stream/promises');
const path = require('path');

async function splitFile(inputPath, chunkSizeMB = 10) {
const chunkSize = chunkSizeMB * 1024 * 1024;
const stat = await fs.stat(inputPath);
const totalChunks = Math.ceil(stat.size / chunkSize);
const ext = path.extname(inputPath);
const base = path.basename(inputPath, ext);
const dir = path.dirname(inputPath);

console.log(`파일 크기: ${(stat.size / 1024 / 1024).toFixed(1)} MB`);
console.log(`청크 수: ${totalChunks}`);

for (let i = 0; i < totalChunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize - 1, stat.size - 1);
const outputPath = path.join(dir, `${base}.part${String(i + 1).padStart(3, '0')}${ext}`);

await pipeline(
createReadStream(inputPath, { start, end }),
createWriteStream(outputPath)
);
console.log(`청크 ${i + 1}/${totalChunks} 생성: ${outputPath}`);
}
}

splitFile('./large-video.mp4', 50).catch(console.error);

고수 팁

백프레셔(Backpressure) 처리

스트림 처리 속도보다 데이터 생성 속도가 빠를 때 백프레셔가 발생합니다:

const fs = require('fs');

const readStream = fs.createReadStream('./large-file.txt');
const writeStream = fs.createWriteStream('/dev/null');

readStream.on('data', (chunk) => {
// write()가 false를 반환하면 버퍼가 가득 찬 것
const canContinue = writeStream.write(chunk);

if (!canContinue) {
console.log('백프레셔: 읽기 일시 중지');
readStream.pause(); // 읽기 중지

writeStream.once('drain', () => {
console.log('버퍼 비워짐: 읽기 재개');
readStream.resume(); // 읽기 재개
});
}
});

readStream.on('end', () => writeStream.end());

// pipeline() 사용 시 자동으로 처리됨

스트림 메모리 사용량 측정

const { Readable } = require('stream');

// 1GB 데이터를 스트림으로 처리
async function processWithStream() {
let totalBytes = 0;

const stream = Readable.from(generateData(1000)); // 1000개 청크

for await (const chunk of stream) {
totalBytes += chunk.length;
// 메모리에는 청크 하나만 올라옴
}

return totalBytes;
}

function* generateData(count) {
for (let i = 0; i < count; i++) {
yield Buffer.alloc(1024 * 1024, i % 256); // 1MB 청크
}
}

const before = process.memoryUsage().heapUsed;
processWithStream().then(total => {
const after = process.memoryUsage().heapUsed;
const memoryInMB = (after - before) / 1024 / 1024;
console.log(`처리된 데이터: ${total / 1024 / 1024} MB`);
console.log(`사용된 메모리: ${memoryInMB.toFixed(1)} MB`);
});
Advertisement