Skip to main content
Advertisement

CSV Processing — csv Module, DictReader/DictWriter, Large-File Handling

CSV (Comma-Separated Values) is the most widely used data exchange format. Python's csv module handles complex quoting and escaping automatically, and DictReader/DictWriter enable intuitive header-based processing.


csv Module Basics — reader / writer

Reading

import csv

with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)

# Read header
header = next(reader)
print(f"Columns: {header}")

# Read data
for row in reader:
print(row) # row is a list of strings

Always specify newline="". The csv module must handle newlines itself to correctly process newlines inside quoted fields.

Writing

import csv

rows = [
["Name", "Age", "Email"],
["Alice", 30, "alice@example.com"],
["Bob", 25, "bob@example.com"],
]

with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(rows[0]) # Single row
writer.writerows(rows[1:]) # Multiple rows at once

DictReader / DictWriter — Header-Based Processing

DictReader

Each row is returned as a dictionary. Column names are used as keys, enabling name-based access rather than position-based.

import csv

with open("employees.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
# reader.fieldnames: header list ['name', 'age', 'dept']

for row in reader:
# row is a dict: {'name': 'Alice', 'age': '30', 'dept': 'Engineering'}
name = row["name"]
age = int(row["age"]) # All values are strings
print(f"{name}: {age}")

You can also specify names for files without headers:

reader = csv.DictReader(f, fieldnames=["id", "name", "score"])

DictWriter

import csv

employees = [
{"name": "Alice", "age": 30, "dept": "Engineering"},
{"name": "Bob", "age": 25, "dept": "Marketing"},
]

fieldnames = ["name", "age", "dept"]

with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)

writer.writeheader() # Write header row
writer.writerows(employees) # Write data rows

# extrasaction='ignore': ignore extra keys in dictionary
# extrasaction='raise' (default): ValueError on extra keys

csv.Sniffer — Auto-Detect Delimiter

import csv

def auto_detect_dialect(path: str) -> csv.Dialect:
"""Auto-detects the delimiter and format of a CSV file."""
sniffer = csv.Sniffer()
with open(path, "r", encoding="utf-8") as f:
sample = f.read(4096)
dialect = sniffer.sniff(sample)
has_header = sniffer.has_header(sample)
print(f"Delimiter: {dialect.delimiter!r}, Has header: {has_header}")
return dialect

# Use detected dialect
with open("unknown.csv", "r", encoding="utf-8", newline="") as f:
dialect = auto_detect_dialect("unknown.csv")
reader = csv.reader(f, dialect)
for row in reader:
print(row)

Dialect Customization

import csv

# Tab delimiter (TSV)
with open("data.tsv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter="\t")

# Semicolon delimiter (European Excel)
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter=";", quotechar='"')

# Register custom dialect
csv.register_dialect(
"pipe_delimited",
delimiter="|",
quotechar="'",
quoting=csv.QUOTE_MINIMAL,
skipinitialspace=True,
)

with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, dialect="pipe_delimited")

quoting Options

ConstantMeaning
csv.QUOTE_MINIMALQuote only when necessary (default)
csv.QUOTE_ALLQuote all fields
csv.QUOTE_NONNUMERICQuote non-numeric fields
csv.QUOTE_NONENo quoting

Large CSV Streaming

import csv
from typing import Iterator

def stream_csv(
path: str,
chunk_size: int = 1000,
) -> Iterator[list[dict]]:
"""
Streams a large CSV in chunks.
Does not load the entire file into memory.
"""
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
chunk = []
for i, row in enumerate(reader):
chunk.append(row)
if (i + 1) % chunk_size == 0:
yield chunk
chunk = []
if chunk:
yield chunk

# Usage
for chunk in stream_csv("large_data.csv", chunk_size=500):
process_batch(chunk)

Practical Example 1 — Data Transformation

import csv
from datetime import datetime

def normalize_sales_data(
input_path: str,
output_path: str,
) -> dict:
"""
Normalizes sales data CSV.
- Unify date format (YYYY-MM-DD)
- Remove currency symbols from amounts
- Empty values → None
"""
stats = {"processed": 0, "skipped": 0}
output_rows = []

with open(input_path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
try:
# Normalize date
for fmt in ("%Y/%m/%d", "%d-%m-%Y", "%m/%d/%Y"):
try:
dt = datetime.strptime(row["date"], fmt)
row["date"] = dt.strftime("%Y-%m-%d")
break
except ValueError:
continue

# Clean amount
amount_str = row["amount"].replace(",", "").lstrip("$€£")
row["amount"] = float(amount_str) if amount_str else None

# Handle empty values
row = {k: (v.strip() or None) for k, v in row.items()}
output_rows.append(row)
stats["processed"] += 1
except (ValueError, KeyError) as e:
print(f"Skipping row ({e}): {row}")
stats["skipped"] += 1

if output_rows:
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_rows[0].keys())
writer.writeheader()
writer.writerows(output_rows)

return stats

Practical Example 2 — Filtering and Aggregation

import csv
from collections import defaultdict
from typing import Callable

def aggregate_csv(
path: str,
group_by: str,
value_col: str,
filter_fn: Callable[[dict], bool] | None = None,
) -> dict[str, float]:
"""
Aggregates CSV by the group_by column.
"""
totals: dict[str, float] = defaultdict(float)
counts: dict[str, int] = defaultdict(int)

with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if filter_fn and not filter_fn(row):
continue
key = row[group_by]
try:
totals[key] += float(row[value_col])
counts[key] += 1
except (ValueError, KeyError):
pass

return dict(totals)

# Usage example
sales = aggregate_csv(
"sales.csv",
group_by="region",
value_col="amount",
filter_fn=lambda r: r.get("status") == "completed",
)
for region, total in sorted(sales.items(), key=lambda x: -x[1]):
print(f"{region}: {total:,.2f}")

Practical Example 3 — CSV ↔ JSON Conversion

import csv
import json
from pathlib import Path

def csv_to_json(
csv_path: str | Path,
json_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""Converts a CSV file to JSON."""
records = []
with open(csv_path, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(dict(row))

with open(json_path, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)

return len(records)


def json_to_csv(
json_path: str | Path,
csv_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""Converts a JSON file to CSV."""
with open(json_path, "r", encoding="utf-8") as f:
records: list[dict] = json.load(f)

if not records:
return 0

with open(csv_path, "w", encoding=encoding, newline="") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)

return len(records)

Expert Tips

Tip 1 — Handling UTF-8 with BOM (Windows Excel)

CSV files created by Windows Excel may have a BOM (Byte Order Mark).

# utf-8-sig: auto-handles BOM
with open("excel_export.csv", "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)

Tip 2 — Aggregating large CSV without pandas

import csv
from collections import Counter

def count_by_column(path: str, column: str) -> Counter:
counts: Counter = Counter()
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
counts[row[column]] += 1
return counts

Tip 3 — csv.writer lineterminator

Explicitly specify line endings when generating CSV compatible with Windows Excel.

writer = csv.writer(f, lineterminator="\r\n")  # Windows CRLF
writer = csv.writer(f, lineterminator="\n") # Unix LF

Tip 4 — Automated type conversion

import csv

TYPE_MAP = {
"id": int,
"age": int,
"score": float,
"active": lambda x: x.lower() in ("true", "1", "yes"),
}

def typed_reader(path: str, type_map: dict):
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
yield {
k: type_map[k](v) if k in type_map and v else v
for k, v in row.items()
}
Advertisement