CSV Processing — csv Module, DictReader/DictWriter, Large-File Handling
CSV (Comma-Separated Values) is the most widely used data exchange format. Python's csv module handles complex quoting and escaping automatically, and DictReader/DictWriter enable intuitive header-based processing.
csv Module Basics — reader / writer
Reading
import csv
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)
# Read header
header = next(reader)
print(f"Columns: {header}")
# Read data
for row in reader:
print(row) # row is a list of strings
Always specify newline="". The csv module must handle newlines itself to correctly process newlines inside quoted fields.
Writing
import csv
rows = [
["Name", "Age", "Email"],
["Alice", 30, "alice@example.com"],
["Bob", 25, "bob@example.com"],
]
with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(rows[0]) # Single row
writer.writerows(rows[1:]) # Multiple rows at once
DictReader / DictWriter — Header-Based Processing
DictReader
Each row is returned as a dictionary. Column names are used as keys, enabling name-based access rather than position-based.
import csv
with open("employees.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
# reader.fieldnames: header list ['name', 'age', 'dept']
for row in reader:
# row is a dict: {'name': 'Alice', 'age': '30', 'dept': 'Engineering'}
name = row["name"]
age = int(row["age"]) # All values are strings
print(f"{name}: {age}")
You can also specify names for files without headers:
reader = csv.DictReader(f, fieldnames=["id", "name", "score"])
DictWriter
import csv
employees = [
{"name": "Alice", "age": 30, "dept": "Engineering"},
{"name": "Bob", "age": 25, "dept": "Marketing"},
]
fieldnames = ["name", "age", "dept"]
with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # Write header row
writer.writerows(employees) # Write data rows
# extrasaction='ignore': ignore extra keys in dictionary
# extrasaction='raise' (default): ValueError on extra keys
csv.Sniffer — Auto-Detect Delimiter
import csv
def auto_detect_dialect(path: str) -> csv.Dialect:
"""Auto-detects the delimiter and format of a CSV file."""
sniffer = csv.Sniffer()
with open(path, "r", encoding="utf-8") as f:
sample = f.read(4096)
dialect = sniffer.sniff(sample)
has_header = sniffer.has_header(sample)
print(f"Delimiter: {dialect.delimiter!r}, Has header: {has_header}")
return dialect
# Use detected dialect
with open("unknown.csv", "r", encoding="utf-8", newline="") as f:
dialect = auto_detect_dialect("unknown.csv")
reader = csv.reader(f, dialect)
for row in reader:
print(row)
Dialect Customization
import csv
# Tab delimiter (TSV)
with open("data.tsv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter="\t")
# Semicolon delimiter (European Excel)
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter=";", quotechar='"')
# Register custom dialect
csv.register_dialect(
"pipe_delimited",
delimiter="|",
quotechar="'",
quoting=csv.QUOTE_MINIMAL,
skipinitialspace=True,
)
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, dialect="pipe_delimited")
quoting Options
| Constant | Meaning |
|---|---|
csv.QUOTE_MINIMAL | Quote only when necessary (default) |
csv.QUOTE_ALL | Quote all fields |
csv.QUOTE_NONNUMERIC | Quote non-numeric fields |
csv.QUOTE_NONE | No quoting |
Large CSV Streaming
import csv
from typing import Iterator
def stream_csv(
path: str,
chunk_size: int = 1000,
) -> Iterator[list[dict]]:
"""
Streams a large CSV in chunks.
Does not load the entire file into memory.
"""
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
chunk = []
for i, row in enumerate(reader):
chunk.append(row)
if (i + 1) % chunk_size == 0:
yield chunk
chunk = []
if chunk:
yield chunk
# Usage
for chunk in stream_csv("large_data.csv", chunk_size=500):
process_batch(chunk)
Practical Example 1 — Data Transformation
import csv
from datetime import datetime
def normalize_sales_data(
input_path: str,
output_path: str,
) -> dict:
"""
Normalizes sales data CSV.
- Unify date format (YYYY-MM-DD)
- Remove currency symbols from amounts
- Empty values → None
"""
stats = {"processed": 0, "skipped": 0}
output_rows = []
with open(input_path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
try:
# Normalize date
for fmt in ("%Y/%m/%d", "%d-%m-%Y", "%m/%d/%Y"):
try:
dt = datetime.strptime(row["date"], fmt)
row["date"] = dt.strftime("%Y-%m-%d")
break
except ValueError:
continue
# Clean amount
amount_str = row["amount"].replace(",", "").lstrip("$€£")
row["amount"] = float(amount_str) if amount_str else None
# Handle empty values
row = {k: (v.strip() or None) for k, v in row.items()}
output_rows.append(row)
stats["processed"] += 1
except (ValueError, KeyError) as e:
print(f"Skipping row ({e}): {row}")
stats["skipped"] += 1
if output_rows:
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_rows[0].keys())
writer.writeheader()
writer.writerows(output_rows)
return stats
Practical Example 2 — Filtering and Aggregation
import csv
from collections import defaultdict
from typing import Callable
def aggregate_csv(
path: str,
group_by: str,
value_col: str,
filter_fn: Callable[[dict], bool] | None = None,
) -> dict[str, float]:
"""
Aggregates CSV by the group_by column.
"""
totals: dict[str, float] = defaultdict(float)
counts: dict[str, int] = defaultdict(int)
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if filter_fn and not filter_fn(row):
continue
key = row[group_by]
try:
totals[key] += float(row[value_col])
counts[key] += 1
except (ValueError, KeyError):
pass
return dict(totals)
# Usage example
sales = aggregate_csv(
"sales.csv",
group_by="region",
value_col="amount",
filter_fn=lambda r: r.get("status") == "completed",
)
for region, total in sorted(sales.items(), key=lambda x: -x[1]):
print(f"{region}: {total:,.2f}")
Practical Example 3 — CSV ↔ JSON Conversion
import csv
import json
from pathlib import Path
def csv_to_json(
csv_path: str | Path,
json_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""Converts a CSV file to JSON."""
records = []
with open(csv_path, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(dict(row))
with open(json_path, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
return len(records)
def json_to_csv(
json_path: str | Path,
csv_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""Converts a JSON file to CSV."""
with open(json_path, "r", encoding="utf-8") as f:
records: list[dict] = json.load(f)
if not records:
return 0
with open(csv_path, "w", encoding=encoding, newline="") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)
return len(records)
Expert Tips
Tip 1 — Handling UTF-8 with BOM (Windows Excel)
CSV files created by Windows Excel may have a BOM (Byte Order Mark).
# utf-8-sig: auto-handles BOM
with open("excel_export.csv", "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
Tip 2 — Aggregating large CSV without pandas
import csv
from collections import Counter
def count_by_column(path: str, column: str) -> Counter:
counts: Counter = Counter()
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
counts[row[column]] += 1
return counts
Tip 3 — csv.writer lineterminator
Explicitly specify line endings when generating CSV compatible with Windows Excel.
writer = csv.writer(f, lineterminator="\r\n") # Windows CRLF
writer = csv.writer(f, lineterminator="\n") # Unix LF
Tip 4 — Automated type conversion
import csv
TYPE_MAP = {
"id": int,
"age": int,
"score": float,
"active": lambda x: x.lower() in ("true", "1", "yes"),
}
def typed_reader(path: str, type_map: dict):
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
yield {
k: type_map[k](v) if k in type_map and v else v
for k, v in row.items()
}