Skip to main content

Node.js Streams and Buffers

What Is a Stream?

A stream processes data by splitting it into small chunks. When working with large files, you can read and process data piece by piece without loading the entire file into memory, making it highly memory-efficient.

Loading everything into memory:       Stream approach:
┌───────────────────────┐ chunk1 → process
│ 1GB file (whole) │ chunk2 → process
│ (needs 1GB memory) │ chunk3 → process
└───────────────────────┘ ...

4 Types of Streams

1. Readable (Read Stream)

A stream that reads data from a source.

const fs = require('fs');
const { Readable } = require('stream');

// Read from a file
const fileStream = fs.createReadStream('./large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024, // chunk size: 64KB (default: 16KB)
});

// Event-based processing
fileStream.on('data', (chunk) => {
console.log(`Chunk received: ${chunk.length} bytes`);
});

fileStream.on('end', () => {
console.log('File read complete');
});

fileStream.on('error', (err) => {
console.error('Read error:', err.message);
});

// Create a custom Readable stream
class CounterStream extends Readable {
constructor(max) {
super({ objectMode: true }); // objects can also be streamed
this.max = max;
this.current = 0;
}

_read() {
if (this.current < this.max) {
this.push(this.current++);
} else {
this.push(null); // end the stream
}
}
}

const counter = new CounterStream(5);
counter.on('data', (num) => console.log('Number:', num));
counter.on('end', () => console.log('Counter complete'));

2. Writable (Write Stream)

A stream that writes data to a destination.

const fs = require('fs');
const { Writable } = require('stream');

// Write to a file
const writeStream = fs.createWriteStream('./output.txt', {
encoding: 'utf8',
flags: 'w', // 'a' for append
});

writeStream.write('First line\n');
writeStream.write('Second line\n');
writeStream.end('Last line\n'); // finish writing

writeStream.on('finish', () => console.log('File write complete'));
writeStream.on('error', (err) => console.error('Write error:', err.message));

// Custom Writable stream
class LogStream extends Writable {
constructor(prefix) {
super({ decodeStrings: false }); // receive strings as-is
this.prefix = prefix;
this.lineCount = 0;
}

_write(chunk, encoding, callback) {
this.lineCount++;
console.log(`[${this.prefix}] #${this.lineCount}: ${chunk.toString().trim()}`);
callback(); // ready to process next chunk
}

_final(callback) {
console.log(`[${this.prefix}] Processed ${this.lineCount} lines total`);
callback();
}
}

const logger = new LogStream('APP');
logger.write('Starting\n');
logger.write('Processing...\n');
logger.end('Done!\n');

3. Duplex (Bidirectional Stream)

Supports both reading and writing. A network socket is a classic example.

const { Duplex } = require('stream');

class EchoStream extends Duplex {
constructor() {
super();
this.buffer = [];
}

_write(chunk, encoding, callback) {
// store received data in the buffer
this.buffer.push(chunk);
callback();
}

_read(size) {
// retrieve data from the buffer
if (this.buffer.length > 0) {
this.push(this.buffer.shift());
}
}
}

const echo = new EchoStream();
echo.write('Hello');
echo.write(' World');
echo.on('data', (chunk) => console.log('Echo:', chunk.toString()));

4. Transform (Transform Stream)

Transforms input and produces output. gzip compression and encryption fall into this category.

const { Transform } = require('stream');
const { createGzip, createGunzip } = require('zlib');
const fs = require('fs');

// Built-in Transform: gzip compression
const gzip = createGzip();
const source = fs.createReadStream('./input.txt');
const dest = fs.createWriteStream('./input.txt.gz');
source.pipe(gzip).pipe(dest);
dest.on('finish', () => console.log('Compression complete'));

// Custom Transform: uppercase conversion
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
}

// Custom Transform: CSV parsing
class CsvParser extends Transform {
constructor() {
super({ objectMode: true }); // string input → object output
this.headers = null;
this.remainder = '';
}

_transform(chunk, encoding, callback) {
const text = this.remainder + chunk.toString();
const lines = text.split('\n');
this.remainder = lines.pop(); // keep last incomplete line

for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',').map(v => v.trim());

if (!this.headers) {
this.headers = values;
} else {
const obj = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]])
);
this.push(obj);
}
}
callback();
}

_flush(callback) {
// process remaining data at the end
if (this.remainder.trim() && this.headers) {
const values = this.remainder.split(',').map(v => v.trim());
const obj = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]])
);
this.push(obj);
}
callback();
}
}

Connecting Streams with pipe()

const fs = require('fs');
const { createGzip } = require('zlib');
const { createCipheriv, randomBytes } = require('crypto');

// Simple pipe
fs.createReadStream('./input.txt')
.pipe(fs.createWriteStream('./output.txt'));

// Chained pipe: read → compress → write
fs.createReadStream('./input.txt')
.pipe(createGzip())
.pipe(fs.createWriteStream('./input.txt.gz'));

// The pitfall of pipe without error handling
const src = fs.createReadStream('./input.txt');
const gz = createGzip();
const dest = fs.createWriteStream('./output.gz');

// On error, pipe does not automatically clean up
src.on('error', cleanup);
gz.on('error', cleanup);
dest.on('error', cleanup);

function cleanup(err) {
console.error('Error:', err.message);
src.destroy();
gz.destroy();
dest.destroy();
}

src.pipe(gz).pipe(dest);
dest.on('finish', () => console.log('Done'));

stream/promises: pipeline()

pipeline() is safer than pipe. On error, it automatically cleans up all streams.

const { pipeline } = require('stream/promises');
const fs = require('fs');
const { createGzip, createGunzip } = require('zlib');
const { Transform } = require('stream');

// Compress a file
async function compressFile(input, output) {
await pipeline(
fs.createReadStream(input),
createGzip(),
fs.createWriteStream(output)
);
console.log(`Compression complete: ${input}${output}`);
}

// Decompress a file
async function decompressFile(input, output) {
await pipeline(
fs.createReadStream(input),
createGunzip(),
fs.createWriteStream(output)
);
console.log(`Decompression complete: ${input}${output}`);
}

// Stream-process a large CSV file
async function processCsvStream(inputPath, outputPath) {
const lineCounter = new Transform({
objectMode: true,
transform(record, enc, callback) {
// process each record
const processed = {
...record,
name: record.name?.toUpperCase(),
processed_at: new Date().toISOString(),
};
callback(null, JSON.stringify(processed) + '\n');
},
});

await pipeline(
fs.createReadStream(inputPath),
new CsvParser(), // class defined above
lineCounter,
fs.createWriteStream(outputPath)
);
}

// Run
compressFile('./data.txt', './data.txt.gz').catch(console.error);

Buffer vs String Encoding

const { Buffer } = require('buffer');

// Create a Buffer
const buf1 = Buffer.from('Hello World', 'utf8');
const buf2 = Buffer.from([0x48, 0x65, 0x6c, 0x6c, 0x6f]);
const buf3 = Buffer.alloc(10); // 10 bytes filled with 0
const buf4 = Buffer.allocUnsafe(10); // uninitialized buffer (fast, use with care)

// Buffer → string conversion
console.log(buf1.toString('utf8')); // Hello World
console.log(buf2.toString('ascii')); // Hello
console.log(buf1.toString('base64')); // base64 encoding
console.log(buf1.toString('hex')); // hex encoding

// String → Buffer
const buf5 = Buffer.from('Hello World');
console.log(buf5.length); // 11 (byte count in UTF-8)

// Korean characters are 3 bytes in UTF-8
const kor = Buffer.from('한');
console.log(kor.length); // 3

// Buffer manipulation
const a = Buffer.from('Hello');
const b = Buffer.from(' World');
const combined = Buffer.concat([a, b]);
console.log(combined.toString()); // Hello World

// Buffer comparison
const x = Buffer.from('ABC');
const y = Buffer.from('ABC');
console.log(x.equals(y)); // true
console.log(x === y); // false (different objects)

// Encoding detection
const encoded = 'SGVsbG8gV29ybGQ='; // base64
const decoded = Buffer.from(encoded, 'base64').toString('utf8');
console.log(decoded); // Hello World

Practical: Processing Large Files

Analyzing a Large Log File

const fs = require('fs');
const { pipeline } = require('stream/promises');
const { Transform, Writable } = require('stream');
const readline = require('readline');

async function analyzeLogFile(logPath) {
const stats = {
total: 0,
errors: 0,
warnings: 0,
info: 0,
ipCounts: new Map(),
};

// Process line by line with readline interface
const rl = readline.createInterface({
input: fs.createReadStream(logPath),
crlfDelay: Infinity,
});

for await (const line of rl) {
stats.total++;

if (line.includes('ERROR')) stats.errors++;
else if (line.includes('WARN')) stats.warnings++;
else if (line.includes('INFO')) stats.info++;

// Extract IP address
const ipMatch = line.match(/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/);
if (ipMatch) {
const ip = ipMatch[0];
stats.ipCounts.set(ip, (stats.ipCounts.get(ip) || 0) + 1);
}
}

// Print top IPs
const topIPs = [...stats.ipCounts.entries()]
.sort((a, b) => b[1] - a[1])
.slice(0, 5);

return { ...stats, topIPs };
}

analyzeLogFile('./access.log').then(result => {
console.log('Log analysis results:');
console.log(` Total: ${result.total} lines`);
console.log(` Errors: ${result.errors}`);
console.log(` Warnings: ${result.warnings}`);
console.log(' Top IPs:', result.topIPs);
}).catch(console.error);

Splitting a File into Chunks

const fs = require('fs/promises');
const { createReadStream, createWriteStream } = require('fs');
const { pipeline } = require('stream/promises');
const path = require('path');

async function splitFile(inputPath, chunkSizeMB = 10) {
const chunkSize = chunkSizeMB * 1024 * 1024;
const stat = await fs.stat(inputPath);
const totalChunks = Math.ceil(stat.size / chunkSize);
const ext = path.extname(inputPath);
const base = path.basename(inputPath, ext);
const dir = path.dirname(inputPath);

console.log(`File size: ${(stat.size / 1024 / 1024).toFixed(1)} MB`);
console.log(`Number of chunks: ${totalChunks}`);

for (let i = 0; i < totalChunks; i++) {
const start = i * chunkSize;
const end = Math.min(start + chunkSize - 1, stat.size - 1);
const outputPath = path.join(dir, `${base}.part${String(i + 1).padStart(3, '0')}${ext}`);

await pipeline(
createReadStream(inputPath, { start, end }),
createWriteStream(outputPath)
);
console.log(`Chunk ${i + 1}/${totalChunks} created: ${outputPath}`);
}
}

splitFile('./large-video.mp4', 50).catch(console.error);

Pro Tips

Handling Backpressure

Backpressure occurs when data is produced faster than it can be consumed by the stream:

const fs = require('fs');

const readStream = fs.createReadStream('./large-file.txt');
const writeStream = fs.createWriteStream('/dev/null');

readStream.on('data', (chunk) => {
// If write() returns false, the buffer is full
const canContinue = writeStream.write(chunk);

if (!canContinue) {
console.log('Backpressure: pausing read');
readStream.pause(); // pause reading

writeStream.once('drain', () => {
console.log('Buffer drained: resuming read');
readStream.resume(); // resume reading
});
}
});

readStream.on('end', () => writeStream.end());

// When using pipeline(), this is handled automatically

Measuring Stream Memory Usage

const { Readable } = require('stream');

// Process 1GB of data using streams
async function processWithStream() {
let totalBytes = 0;

const stream = Readable.from(generateData(1000)); // 1000 chunks

for await (const chunk of stream) {
totalBytes += chunk.length;
// only one chunk is in memory at a time
}

return totalBytes;
}

function* generateData(count) {
for (let i = 0; i < count; i++) {
yield Buffer.alloc(1024 * 1024, i % 256); // 1MB chunk
}
}

const before = process.memoryUsage().heapUsed;
processWithStream().then(total => {
const after = process.memoryUsage().heapUsed;
const memoryInMB = (after - before) / 1024 / 1024;
console.log(`Data processed: ${total / 1024 / 1024} MB`);
console.log(`Memory used: ${memoryInMB.toFixed(1)} MB`);
});