Skip to main content
Advertisement

Memory-Efficient Data Processing

When processing large volumes of data, the key is to use a streaming approach rather than loading everything into memory at once. Even a 10 GB file can be processed with a constant memory footprint.


Processing Large Files with Generators

The naive approach loads the entire file into memory, but generators process one line at a time, keeping memory usage constant.

from pathlib import Path


# ❌ Memory-inefficient — loads the entire file into memory
def count_lines_bad(filepath: str) -> int:
with open(filepath, "r", encoding="utf-8") as f:
lines = f.readlines() # Hundreds of MBs or GBs loaded into memory
return len(lines)


# ✅ Memory-efficient — streaming one line at a time
def count_lines_good(filepath: str) -> int:
count = 0
with open(filepath, "r", encoding="utf-8") as f:
for _ in f: # File object is an iterator — reads one line at a time
count += 1
return count


# ✅ Generator pipeline — chain multiple processing stages with lazy evaluation
def read_lines(filepath: str, encoding: str = "utf-8"):
"""Yield file lines one at a time"""
with open(filepath, "r", encoding=encoding) as f:
for line in f:
yield line.rstrip("\n")


def filter_empty(lines):
"""Remove blank lines — pass to next generator"""
for line in lines:
if line.strip():
yield line


def parse_log_line(lines):
"""Parse log lines — convert to structured records"""
import re
pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.*)"
for line in lines:
match = re.match(pattern, line)
if match:
yield {
"timestamp": match.group(1),
"level": match.group(2),
"message": match.group(3),
}


def filter_errors(records):
"""Only pass through ERROR level records"""
for record in records:
if record["level"] in ("ERROR", "CRITICAL"):
yield record


# Assemble pipeline — only the currently processed record exists in memory
def process_log_file(filepath: str) -> list[dict]:
pipeline = (
filter_errors(
parse_log_line(
filter_empty(
read_lines(filepath)
)
)
)
)

# Collect only errors (only error records, not the whole file, are in memory)
return list(pipeline)


# Create a test log file
import tempfile
import os

log_content = """\
2024-01-15 10:00:01 [INFO] Server started
2024-01-15 10:00:02 [DEBUG] DB connection verified
2024-01-15 10:01:05 [ERROR] DB query timed out
2024-01-15 10:01:10 [INFO] Retrying connection
2024-01-15 10:01:15 [ERROR] Reconnection failed

2024-01-15 10:02:00 [CRITICAL] Service down
2024-01-15 10:05:00 [INFO] Recovery complete
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".log",
delete=False, encoding="utf-8") as f:
f.write(log_content)
log_path = f.name

errors = process_log_file(log_path)
print(f"Error records: {len(errors)}")
for err in errors:
print(f" [{err['level']}] {err['timestamp']}: {err['message']}")

os.unlink(log_path)

Processing CSV Files in Chunks

import csv
from typing import Generator, Iterator
from pathlib import Path
import io


def read_csv_chunks(
filepath: str,
chunk_size: int = 1000,
encoding: str = "utf-8",
) -> Generator[list[dict], None, None]:
"""Generator that reads CSV in chunks"""
with open(filepath, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f)
chunk: list[dict] = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk: # Last chunk (may be smaller than chunk_size)
yield chunk


def process_chunk(chunk: list[dict]) -> dict:
"""Aggregate processing per chunk"""
total = 0
count = 0
errors = 0

for row in chunk:
try:
amount = float(row.get("amount", 0))
total += amount
count += 1
except (ValueError, KeyError):
errors += 1

return {"total": total, "count": count, "errors": errors}


def aggregate_csv(filepath: str, chunk_size: int = 10000) -> dict:
"""Aggregate a large CSV — process in chunks to keep memory constant"""
grand_total = 0.0
grand_count = 0
grand_errors = 0

for i, chunk in enumerate(read_csv_chunks(filepath, chunk_size), 1):
result = process_chunk(chunk)
grand_total += result["total"]
grand_count += result["count"]
grand_errors += result["errors"]

if i % 10 == 0:
print(f" Processing: {grand_count:,} rows done...")

return {
"total": grand_total,
"count": grand_count,
"errors": grand_errors,
"average": grand_total / grand_count if grand_count else 0,
}


# Generate a test large CSV
import random
import tempfile

csv_rows = ["id,name,amount,category"]
for i in range(50000):
csv_rows.append(
f"{i},{f'item_{i}'},"
f"{random.uniform(100, 100000):.2f},"
f"{random.choice(['A', 'B', 'C'])}"
)

with tempfile.NamedTemporaryFile(mode="w", suffix=".csv",
delete=False, encoding="utf-8") as f:
f.write("\n".join(csv_rows))
csv_path = f.name

result = aggregate_csv(csv_path, chunk_size=5000)
print(f"Total: {result['total']:,.2f}")
print(f"Rows: {result['count']:,}")
print(f"Average: {result['average']:,.2f}")

os.unlink(csv_path)

Processing JSON Files in Chunks

import json
from typing import Generator


def stream_json_array(filepath: str) -> Generator[dict, None, None]:
"""Read a JSON array file in a streaming fashion (without ijson)"""
with open(filepath, "r", encoding="utf-8") as f:
content = f.read(1) # '['
assert content == "[", "Must be a JSON array."

depth = 0
buffer = ""
in_string = False
escape_next = False

for char in f:
for c in char:
if escape_next:
buffer += c
escape_next = False
continue

if c == "\\" and in_string:
buffer += c
escape_next = True
continue

if c == '"':
in_string = not in_string

if not in_string:
if c == "{":
depth += 1
buffer += c
elif c == "}":
depth -= 1
buffer += c
if depth == 0:
yield json.loads(buffer.strip())
buffer = ""
elif c in (",", "]", "\n", " ", "\t") and depth == 0:
pass
else:
buffer += c
else:
buffer += c


# Streaming with ijson (pip install ijson)
def stream_json_with_ijson(filepath: str, prefix: str = "item") -> Generator:
"""Stream large JSON with ijson"""
try:
import ijson
with open(filepath, "rb") as f:
for item in ijson.items(f, prefix):
yield item
except ImportError:
print("ijson not installed: pip install ijson")
# Fallback: load all at once
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
for item in data:
yield item


# Example: generate and process large JSON
import tempfile

json_data = [{"id": i, "value": i * 2.5, "label": f"item_{i}"}
for i in range(100)]

with tempfile.NamedTemporaryFile(mode="w", suffix=".json",
delete=False, encoding="utf-8") as f:
json.dump(json_data, f)
json_path = f.name

# Streaming processing
total = sum(item["value"] for item in stream_json_array(json_path))
print(f"JSON streaming total: {total:.1f}")

os.unlink(json_path)

Measuring Memory Usage with memory_profiler

# pip install memory-profiler
# Usage: add @profile decorator and run with: python -m memory_profiler script.py

# Code example (run in terminal after installing memory_profiler)
"""
from memory_profiler import profile

@profile
def load_all_at_once(n: int) -> int:
# Memory-inefficient: loads entire list into memory
data = list(range(n))
return sum(data)

@profile
def stream_process(n: int) -> int:
# Memory-efficient: uses generator
return sum(range(n))

load_all_at_once(10_000_000)
stream_process(10_000_000)
"""

# Measuring memory usage from within code
from memory_profiler import memory_usage

def load_bad():
return list(range(5_000_000))

def load_good():
return sum(range(5_000_000))

try:
mem_bad = memory_usage((load_bad, ), max_usage=True)
mem_good = memory_usage((load_good, ), max_usage=True)
print(f"List approach: {mem_bad:.1f} MB")
print(f"Generator approach: {mem_good:.1f} MB")
except ImportError:
print("memory_profiler not installed — using alternative")
import tracemalloc

tracemalloc.start()
_ = list(range(5_000_000))
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"List approach peak: {peak / 1024 / 1024:.1f} MB")

sys.getsizeof vs tracemalloc

import sys
import tracemalloc


# sys.getsizeof: shallow size of the object itself
lst = list(range(1000))
print(f"list object itself: {sys.getsizeof(lst)} bytes") # ~8000 bytes (pointer array)
print(f"single int object: {sys.getsizeof(0)} bytes") # ~28 bytes
print(f"actual total size: ~{sys.getsizeof(lst) + 1000 * sys.getsizeof(0)} bytes")
# sys.getsizeof does NOT include the sizes of contained objects!

# Recursive size measurement (actual memory used)
def deep_sizeof(obj, seen=None) -> int:
"""Recursively compute the actual memory size of an object"""
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)

if isinstance(obj, dict):
size += sum(deep_sizeof(k, seen) + deep_sizeof(v, seen)
for k, v in obj.items())
elif isinstance(obj, (list, tuple, set, frozenset)):
size += sum(deep_sizeof(item, seen) for item in obj)

return size


data = {"name": "Alice", "scores": [95, 87, 92], "active": True}
print(f"\ndict shallow size: {sys.getsizeof(data)} bytes")
print(f"dict deep size: {deep_sizeof(data)} bytes")


# tracemalloc: trace memory allocations in a code block
print("\n=== tracemalloc measurement ===")

tracemalloc.start()

# Code to measure
snapshot_before = tracemalloc.take_snapshot()

large_list = [{"id": i, "data": "x" * 100} for i in range(10000)]

snapshot_after = tracemalloc.take_snapshot()

# Diff analysis
top_stats = snapshot_after.compare_to(snapshot_before, "lineno")
print("Top 3 memory increases:")
for stat in top_stats[:3]:
print(f" {stat}")

current, peak = tracemalloc.get_traced_memory()
print(f"\nCurrent usage: {current / 1024:.1f} KB")
print(f"Peak usage: {peak / 1024:.1f} KB")

tracemalloc.stop()

# Cleanup
del large_list

# Object size comparison
items = {
"empty list": [],
"100 int list": list(range(100)),
"100 str list": [str(i) for i in range(100)],
"100 dict list": [{"k": i} for i in range(100)],
}

print("\n=== Object size comparison ===")
for name, obj in items.items():
print(f"{name}: {deep_sizeof(obj):,} bytes")

Streaming HTTP Response Processing

# pip install requests
import requests
from pathlib import Path


def download_large_file(url: str, destination: str, chunk_size: int = 8192) -> Path:
"""Download a large file in chunks — without loading the whole thing into memory"""
dest_path = Path(destination)

with requests.get(url, stream=True, timeout=30) as response:
response.raise_for_status()

total_size = int(response.headers.get("content-length", 0))
downloaded = 0

with open(dest_path, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # Filter empty chunks
f.write(chunk)
downloaded += len(chunk)

if total_size:
pct = downloaded / total_size * 100
print(f"\rDownloading: {downloaded:,} / {total_size:,} bytes "
f"({pct:.1f}%)", end="", flush=True)

print(f"\nDone: {dest_path}")
return dest_path


def stream_json_api(url: str, params: dict | None = None) -> dict:
"""Handle JSON API response via streaming"""
with requests.get(url, params=params, stream=True, timeout=30) as response:
response.raise_for_status()

# Parse at once if the JSON is small
content_length = int(response.headers.get("content-length", 0))
if content_length < 10 * 1024 * 1024: # Under 10MB
return response.json()

# Collect chunks then parse for large payloads
import json
chunks = []
for chunk in response.iter_content(chunk_size=8192):
chunks.append(chunk)
return json.loads(b"".join(chunks))


def stream_lines_from_api(url: str) -> None:
"""Process line-by-line from a streaming API (e.g. Server-Sent Events)"""
with requests.get(url, stream=True, timeout=60) as response:
response.raise_for_status()
for line in response.iter_lines(chunk_size=1024):
if line:
decoded = line.decode("utf-8")
# Process each line
print(f"Received: {decoded}")

Real-World: Processing a 10 GB Log File

import re
import os
import gzip
import tempfile
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
from typing import Generator


# Log line parsing pattern
LOG_PATTERN = re.compile(
r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) "
r"\[(\w+)\] "
r"\[([^\]]+)\] "
r"(.*)"
)


def read_log_file(filepath: str) -> Generator[str, None, None]:
"""Line generator supporting both regular and gzip files"""
path = Path(filepath)
if path.suffix == ".gz":
opener = gzip.open(filepath, "rt", encoding="utf-8", errors="replace")
else:
opener = open(filepath, "r", encoding="utf-8", errors="replace")

with opener as f:
for line in f:
yield line.rstrip("\n")


def parse_log_entry(line: str) -> dict | None:
"""Parse a log line — returns None if parsing fails"""
match = LOG_PATTERN.match(line)
if not match:
return None

timestamp_str, level, module, message = match.groups()
try:
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S,%f")
except ValueError:
return None

return {
"timestamp": timestamp,
"level": level,
"module": module,
"message": message,
}


def analyze_log_streaming(filepath: str, report_interval: int = 100_000) -> dict:
"""
Stream-analyze a large log file
- Does not load the entire file into memory
- Only aggregated results are kept in memory
"""
# Aggregate data (small — memory safe)
level_counts = Counter()
module_counts = Counter()
error_messages = [] # Keep at most N
max_errors_to_keep = 100
total_lines = 0
parsed_lines = 0
first_timestamp = None
last_timestamp = None
hourly_stats = defaultdict(Counter)

print(f"Starting log analysis: {filepath}")
start = datetime.now()

for line in read_log_file(filepath):
total_lines += 1

if total_lines % report_interval == 0:
elapsed = (datetime.now() - start).total_seconds()
rate = total_lines / elapsed if elapsed else 0
print(f" Progress: {total_lines:,} lines ({rate:,.0f} lines/sec)")

entry = parse_log_entry(line)
if entry is None:
continue

parsed_lines += 1
ts = entry["timestamp"]
level = entry["level"]
module = entry["module"]

# Track time range
if first_timestamp is None or ts < first_timestamp:
first_timestamp = ts
if last_timestamp is None or ts > last_timestamp:
last_timestamp = ts

# Aggregate
level_counts[level] += 1
module_counts[module] += 1
hour_key = ts.strftime("%Y-%m-%d %H:00")
hourly_stats[hour_key][level] += 1

# Collect error message samples (up to N)
if level in ("ERROR", "CRITICAL") and len(error_messages) < max_errors_to_keep:
error_messages.append({
"timestamp": ts.isoformat(),
"level": level,
"module": module,
"message": entry["message"][:200], # Truncate length
})

elapsed = (datetime.now() - start).total_seconds()

return {
"summary": {
"total_lines": total_lines,
"parsed_lines": parsed_lines,
"parse_rate": parsed_lines / total_lines if total_lines else 0,
"time_range": {
"from": first_timestamp.isoformat() if first_timestamp else None,
"to": last_timestamp.isoformat() if last_timestamp else None,
},
"elapsed_seconds": elapsed,
"lines_per_second": total_lines / elapsed if elapsed else 0,
},
"level_counts": dict(level_counts.most_common()),
"top_modules": dict(module_counts.most_common(10)),
"error_samples": error_messages[:10],
"hourly_stats": dict(sorted(hourly_stats.items())),
}


# Generate a test large log and analyze it
import random

log_lines = []
levels = ["DEBUG"] * 50 + ["INFO"] * 30 + ["WARNING"] * 12 + ["ERROR"] * 7 + ["CRITICAL"] * 1
modules = ["auth", "database", "api", "cache", "worker", "scheduler"]

for i in range(10000):
ts = f"2024-01-15 {10 + i // 3600:02d}:{(i % 3600) // 60:02d}:{i % 60:02d},000"
level = random.choice(levels)
module = random.choice(modules)
msg = f"Processing complete {i}" if level in ("INFO", "DEBUG") else f"Error occurred: code {i % 100}"
log_lines.append(f"{ts} [{level}] [{module}] {msg}")

with tempfile.NamedTemporaryFile(mode="w", suffix=".log",
delete=False, encoding="utf-8") as f:
f.write("\n".join(log_lines))
log_path = f.name

report = analyze_log_streaming(log_path, report_interval=2000)

print(f"\n=== Analysis Report ===")
print(f"Total {report['summary']['total_lines']:,} lines")
print(f"Parse success rate: {report['summary']['parse_rate']:.1%}")
print(f"Processing speed: {report['summary']['lines_per_second']:,.0f} lines/sec")
print(f"\nLevel distribution:")
for level, count in report["level_counts"].items():
bar = "█" * (count // 200)
print(f" {level:10s}: {count:6,} {bar}")

os.unlink(log_path)

Pro Tips

1. Chunking iterables with itertools

import itertools
from typing import Generator, Iterable


def batched(iterable: Iterable, n: int) -> Generator:
"""Yield n-item batches from an iterable (Python 3.12+: itertools.batched)"""
it = iter(iterable)
while batch := list(itertools.islice(it, n)):
yield batch


# Save large query results to DB in batches
def save_records_in_batches(records: Iterable[dict], batch_size: int = 1000) -> int:
total_saved = 0
for batch in batched(records, batch_size):
# db.bulk_insert(batch) # Actual DB insert
total_saved += len(batch)
print(f"Batch saved: {len(batch)} items (total: {total_saved:,})")
return total_saved


# Process 1 million records in batches of 1000
def generate_records(n: int):
for i in range(n):
yield {"id": i, "value": i * 2}

save_records_in_batches(generate_records(5000), batch_size=1000)

2. Save memory for large numbers of instances with __slots__

import sys


class LogEntryWithDict:
def __init__(self, timestamp, level, message):
self.timestamp = timestamp
self.level = level
self.message = message


class LogEntryWithSlots:
__slots__ = ("timestamp", "level", "message")

def __init__(self, timestamp, level, message):
self.timestamp = timestamp
self.level = level
self.message = message


e1 = LogEntryWithDict("2024-01-01", "INFO", "test")
e2 = LogEntryWithSlots("2024-01-01", "INFO", "test")

print(f"__dict__ version: {sys.getsizeof(e1) + sys.getsizeof(e1.__dict__)} bytes")
print(f"__slots__ version: {sys.getsizeof(e2)} bytes")

# Compare with 1 million instances
N = 100_000
import tracemalloc

tracemalloc.start()
entries_dict = [LogEntryWithDict("2024-01-01", "INFO", f"message {i}") for i in range(N)]
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"\n__dict__ {N:,} instances: peak {peak / 1024 / 1024:.1f} MB")

del entries_dict
tracemalloc.start()
entries_slots = [LogEntryWithSlots("2024-01-01", "INFO", f"message {i}") for i in range(N)]
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"__slots__ {N:,} instances: peak {peak / 1024 / 1024:.1f} MB")

3. Save memory for homogeneous data with the array module

import array
import sys


# Python list: each element is a Python object (high overhead)
regular_list = list(range(1_000_000))

# array module: C array style (same type only, memory-efficient)
int_array = array.array("i", range(1_000_000)) # 'i' = signed int (4 bytes)
double_array = array.array("d", (float(i) for i in range(1_000_000))) # 'd' = double (8 bytes)

print(f"list size: {sys.getsizeof(regular_list):,} bytes")
print(f"array(int) size: {sys.getsizeof(int_array):,} bytes")
print(f"theoretical int: {4 * 1_000_000:,} bytes (4 bytes × 1M)")

# Even more powerful with NumPy
try:
import numpy as np
np_array = np.arange(1_000_000, dtype=np.int32)
print(f"NumPy int32: {np_array.nbytes:,} bytes")
except ImportError:
pass

Summary

TechniqueMemory EffectWhen to Use
Generator pipelineO(1) memoryLarge file processing
Chunk-based readingOne chunk at a timeCSV, JSON batch processing
tracemallocMeasuring memory usage
__slots__40–50% savingsCreating large numbers of instances
array moduleUp to 90% savingsHomogeneous numeric data
Streaming HTTPConstant memoryDownloading large files

Core principle: Only load what you need into memory. Figure out what you need before you try to see everything.

Advertisement