Async Iterators
Async iterators are used when you want to process data sequentially in a streaming fashion rather than loading it all at once. They are especially useful for large datasets, API pagination, and real-time streams.
The Async Iterable Protocol
An async iterable is an object that implements the Symbol.asyncIterator method.
// Async iterable protocol structure
const asyncIterable = {
[Symbol.asyncIterator]() {
let i = 0;
return {
async next() {
if (i < 3) {
await delay(100); // Simulating an async operation
return { value: i++, done: false };
}
return { value: undefined, done: true };
}
};
}
};
// Usage: for await...of
for await (const value of asyncIterable) {
console.log(value); // 0, 1, 2 (each 100ms apart)
}
for await...of Syntax
// Basic usage
async function processAsyncData() {
const asyncItems = getAsyncIterable();
for await (const item of asyncItems) {
await processItem(item);
console.log('Processed:', item);
}
}
// Can also be used with an array of Promises
async function processPromises() {
const promises = [
fetch('/api/a').then(r => r.json()),
fetch('/api/b').then(r => r.json()),
fetch('/api/c').then(r => r.json())
];
for await (const result of promises) {
console.log(result); // Processed in order
}
}
// Supports break/continue
async function findFirst() {
for await (const item of getLargeDataStream()) {
if (item.score > 90) {
console.log('Found:', item);
break; // Stop processing the rest of the stream
}
if (item.deleted) continue; // Skip deleted items
await processItem(item);
}
}
Async Generators: async function*
// Basic async generator
async function* asyncCounter(start, end) {
for (let i = start; i <= end; i++) {
await delay(100);
yield i;
}
}
// Usage
for await (const num of asyncCounter(1, 5)) {
console.log(num); // 1, 2, 3, 4, 5 (100ms apart)
}
// API pagination generator
async function* fetchAllPages(baseUrl) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(`${baseUrl}?page=${page}&limit=100`);
const { data, pagination } = await response.json();
for (const item of data) {
yield item; // Yield one item at a time
}
hasMore = pagination.page < pagination.totalPages;
page++;
}
}
// Usage: process all users regardless of page
for await (const user of fetchAllPages('/api/users')) {
await processUser(user);
}
Streaming Data Processing Example
// Streaming parse of a large CSV file
async function* parseCSVStream(readableStream) {
const reader = readableStream.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
// Process remaining buffer
if (buffer.trim()) yield parseCSVLine(buffer);
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? ''; // Keep the last incomplete line in the buffer
for (const line of lines) {
if (line.trim()) yield parseCSVLine(line);
}
}
} finally {
reader.releaseLock();
}
}
function parseCSVLine(line) {
return line.split(',').map(cell => cell.trim());
}
// Usage
const response = await fetch('/large-data.csv');
let rowCount = 0;
for await (const row of parseCSVStream(response.body)) {
rowCount++;
if (rowCount % 1000 === 0) {
console.log(`Processed ${rowCount} rows`);
}
await processRow(row);
}
Connecting with the ReadableStream API
// fetch Response provides a ReadableStream
async function streamJsonLines(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const lines = text.split('\n').filter(Boolean);
for (const line of lines) {
yield JSON.parse(line);
}
}
} finally {
reader.releaseLock();
}
}
// Processing streaming LLM API responses (ChatGPT style)
async function* streamChat(messages) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content;
if (content) yield content;
} catch {
// Ignore JSON parse errors
}
}
}
}
} finally {
reader.releaseLock();
}
}
// Update text in the UI in real time
async function displayStreamingResponse(messages) {
const outputElement = document.getElementById('output');
outputElement.textContent = '';
for await (const chunk of streamChat(messages)) {
outputElement.textContent += chunk;
}
}
Practical Example 1: Automatic Pagination Handling
// General-purpose pagination iterator
async function* paginate(fetchPage) {
let cursor = null;
while (true) {
const { items, nextCursor } = await fetchPage(cursor);
for (const item of items) {
yield item;
}
if (!nextCursor) break;
cursor = nextCursor;
}
}
// GitHub API pagination example
async function* getAllGitHubRepos(org) {
yield* paginate(async (cursor) => {
const url = cursor
? `/api/github/orgs/${org}/repos?after=${cursor}`
: `/api/github/orgs/${org}/repos`;
const response = await fetch(url);
const data = await response.json();
return {
items: data.repositories,
nextCursor: data.pageInfo.hasNextPage ? data.pageInfo.endCursor : null
};
});
}
// Process all repositories
let count = 0;
for await (const repo of getAllGitHubRepos('microsoft')) {
count++;
if (repo.stars > 10000) {
console.log(`Popular repo: ${repo.name} (⭐ ${repo.stars})`);
}
}
console.log(`Finished processing ${count} repositories`);
// Collect into an array
async function collectAll(asyncIterable, limit = Infinity) {
const results = [];
for await (const item of asyncIterable) {
results.push(item);
if (results.length >= limit) break;
}
return results;
}
const top100Repos = await collectAll(getAllGitHubRepos('microsoft'), 100);
Practical Example 2: Reading Node.js File Streams
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
// Process a large log file line by line
async function* readLines(filePath) {
const fileStream = createReadStream(filePath, { encoding: 'utf-8' });
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
yield line;
}
}
// Find patterns in error logs
async function findErrors(logFile) {
const errors = [];
for await (const line of readLines(logFile)) {
if (line.includes('[ERROR]')) {
errors.push({
timestamp: extractTimestamp(line),
message: extractMessage(line)
});
}
}
return errors;
}
// Calculate statistics
async function calculateLogStats(logFile) {
const stats = { errors: 0, warnings: 0, info: 0, total: 0 };
for await (const line of readLines(logFile)) {
stats.total++;
if (line.includes('[ERROR]')) stats.errors++;
else if (line.includes('[WARN]')) stats.warnings++;
else if (line.includes('[INFO]')) stats.info++;
}
return stats;
}
// Memory-efficient file processing
async function processLargeFile(inputPath, outputPath) {
const { createWriteStream } = await import('node:fs');
const output = createWriteStream(outputPath);
for await (const line of readLines(inputPath)) {
const processed = transformLine(line);
output.write(processed + '\n');
// Periodically check memory
if (process.memoryUsage().heapUsed > 500 * 1024 * 1024) {
await new Promise(resolve => output.once('drain', resolve));
}
}
await new Promise((resolve, reject) => {
output.end(err => err ? reject(err) : resolve());
});
}
Async Iterator Transformation Utilities
// map: transform each item
async function* asyncMap(iterable, fn) {
for await (const item of iterable) {
yield await fn(item);
}
}
// filter: only items that match the condition
async function* asyncFilter(iterable, predicate) {
for await (const item of iterable) {
if (await predicate(item)) yield item;
}
}
// take: only the first N items
async function* asyncTake(iterable, n) {
let count = 0;
for await (const item of iterable) {
yield item;
if (++count >= n) break;
}
}
// flatMap: transform each item into an iterable, then flatten
async function* asyncFlatMap(iterable, fn) {
for await (const item of iterable) {
const result = await fn(item);
for await (const subItem of result) {
yield subItem;
}
}
}
// Chaining usage example
const pipeline = asyncFilter(
asyncMap(
fetchAllPages('/api/products'),
async (product) => ({
...product,
price: await getDiscountedPrice(product.id)
})
),
(product) => product.price < 50000
);
for await (const affordableProduct of pipeline) {
displayProduct(affordableProduct);
}
Error Handling
// Error handling in async iterators
async function* resilientStream(source) {
try {
for await (const item of source) {
yield item;
}
} catch (err) {
console.error('Stream error:', err);
// Can yield a default value after an error
yield { error: err.message, timestamp: Date.now() };
} finally {
console.log('Stream ended');
// Clean up resources
}
}
// return method: handling early termination
async function* withCleanup(resource) {
try {
yield* processResource(resource);
} finally {
await resource.close(); // Also runs when for await...of breaks
}
}
async function earlyExit() {
const resource = await openResource();
for await (const item of withCleanup(resource)) {
if (shouldStop(item)) break; // finally block runs, calling resource.close()
process(item);
}
}
Pro Tips
1. Choosing between async iterators and Promise.all
// Promise.all: when you need all results at once
const allResults = await Promise.all(items.map(processItem));
// Async iterator: when you want to process results as they arrive or save memory
for await (const result of processItemsStream(items)) {
displayResult(result); // Display as soon as each one completes
}
2. Using async iterators like Observables
// WebSocket messages as an async iterator
async function* websocketMessages(url) {
const ws = new WebSocket(url);
const messageQueue = [];
let resolve;
let done = false;
ws.onmessage = (event) => {
if (resolve) {
resolve({ value: event.data, done: false });
resolve = null;
} else {
messageQueue.push(event.data);
}
};
ws.onclose = () => {
done = true;
if (resolve) resolve({ value: undefined, done: true });
};
while (!done) {
if (messageQueue.length > 0) {
yield messageQueue.shift();
} else {
const result = await new Promise(r => { resolve = r; });
if (!result.done) yield result.value;
}
}
}
// Usage
for await (const message of websocketMessages('wss://api.example.com')) {
console.log('Received:', message);
}
3. Node.js streams and async iterators
// Node.js streams are automatically async iterables
import { createReadStream } from 'node:fs';
const stream = createReadStream('file.txt', { encoding: 'utf-8' });
// Can be used directly with for await...of
for await (const chunk of stream) {
process.stdout.write(chunk);
}