CSV 처리 — csv 모듈, DictReader/DictWriter, 대용량 처리
CSV(Comma-Separated Values)는 가장 보편적인 데이터 교환 형식입니다. Python의 csv 모듈은 복잡한 인용, 이스케이프 처리를 자동으로 해주며, DictReader/DictWriter로 헤더 기반의 직관적인 처리가 가능합니다.
csv 모듈 기본 — reader / writer
읽기
import csv
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)
# 헤더 읽기
header = next(reader)
print(f"컬럼: {header}")
# 데이터 읽기
for row in reader:
print(row) # row는 문자열 리스트
newline=""를 반드시 지정하십시오. CSV 파일의 줄바꿈 처리를 csv 모듈에 맡겨야 인용부호 안의 줄바꿈이 올바르게 처리됩니다.
쓰기
import csv
rows = [
["이름", "나이", "이메일"],
["Alice", 30, "alice@example.com"],
["Bob", 25, "bob@example.com"],
]
with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(rows[0]) # 단일 행
writer.writerows(rows[1:]) # 여러 행 한 번에
DictReader / DictWriter — 헤더 기반 처리
DictReader
각 행이 딕셔너리로 반환됩니다. 컬럼명을 키로 사용하므로 위치가 아닌 이름으로 접근할 수 있습니다.
import csv
with open("employees.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
# reader.fieldnames: 헤더 리스트 ['name', 'age', 'dept']
for row in reader:
# row는 dict: {'name': 'Alice', 'age': '30', 'dept': 'Engineering'}
name = row["name"]
age = int(row["age"]) # 모든 값은 문자열
print(f"{name}: {age}세")
헤더가 없는 파일에 이름을 지정할 수도 있습니다:
reader = csv.DictReader(f, fieldnames=["id", "name", "score"])
DictWriter
import csv
employees = [
{"name": "Alice", "age": 30, "dept": "Engineering"},
{"name": "Bob", "age": 25, "dept": "Marketing"},
]
fieldnames = ["name", "age", "dept"]
with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # 헤더 행 작성
writer.writerows(employees) # 데이터 행들 작성
# extrasaction='ignore': 딕셔너리에 여분 키가 있어도 무시
# extrasaction='raise' (기본값): 여분 키 있으면 ValueError
csv.Sniffer — 구분자 자동 감지
import csv
def auto_detect_dialect(path: str) -> csv.Dialect:
"""CSV 파일의 구분자와 형식을 자동 감지합니다."""
sniffer = csv.Sniffer()
with open(path, "r", encoding="utf-8") as f:
sample = f.read(4096)
dialect = sniffer.sniff(sample)
has_header = sniffer.has_header(sample)
print(f"구분자: {dialect.delimiter!r}, 헤더 있음: {has_header}")
return dialect
# 감지된 dialect 사용
with open("unknown.csv", "r", encoding="utf-8", newline="") as f:
dialect = auto_detect_dialect("unknown.csv")
reader = csv.reader(f, dialect)
for row in reader:
print(row)
Dialect 커스터마이징
import csv
# 탭 구분자 (TSV)
with open("data.tsv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter="\t")
# 세미콜론 구분자 (유럽 Excel)
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter=";", quotechar='"')
# 커스텀 Dialect 등록
csv.register_dialect(
"pipe_delimited",
delimiter="|",
quotechar="'",
quoting=csv.QUOTE_MINIMAL,
skipinitialspace=True,
)
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, dialect="pipe_delimited")
quoting 옵션
| 상수 | 의미 |
|---|---|
csv.QUOTE_MINIMAL | 필요한 경우만 인용 (기본값) |
csv.QUOTE_ALL | 모든 필드 인용 |
csv.QUOTE_NONNUMERIC | 숫자가 아닌 필드 인용 |
csv.QUOTE_NONE | 인용 없음 |
대용량 CSV 스트리밍 처리
import csv
from typing import Iterator
def stream_csv(
path: str,
chunk_size: int = 1000,
) -> Iterator[list[dict]]:
"""
대용량 CSV를 청크 단위로 스트리밍합니다.
메모리에 전체 파일을 올리지 않습니다.
"""
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
chunk = []
for i, row in enumerate(reader):
chunk.append(row)
if (i + 1) % chunk_size == 0:
yield chunk
chunk = []
if chunk:
yield chunk
# 사용
for chunk in stream_csv("large_data.csv", chunk_size=500):
process_batch(chunk)
실전 예제 1 — 데이터 변환
import csv
from datetime import datetime
def normalize_sales_data(
input_path: str,
output_path: str,
) -> dict:
"""
판매 데이터 CSV를 정규화합니다.
- 날짜 형식 통일 (YYYY-MM-DD)
- 금액 앞 통화 기호 제거
- 빈 값 → None
"""
stats = {"processed": 0, "skipped": 0}
output_rows = []
with open(input_path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
try:
# 날짜 정규화
for fmt in ("%Y/%m/%d", "%d-%m-%Y", "%m/%d/%Y"):
try:
dt = datetime.strptime(row["date"], fmt)
row["date"] = dt.strftime("%Y-%m-%d")
break
except ValueError:
continue
# 금액 정리
amount_str = row["amount"].replace(",", "").lstrip("$₩€")
row["amount"] = float(amount_str) if amount_str else None
# 빈 값 처리
row = {k: (v.strip() or None) for k, v in row.items()}
output_rows.append(row)
stats["processed"] += 1
except (ValueError, KeyError) as e:
print(f"행 건너뜀 ({e}): {row}")
stats["skipped"] += 1
if output_rows:
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_rows[0].keys())
writer.writeheader()
writer.writerows(output_rows)
return stats
실전 예제 2 — 필터링과 집계
import csv
from collections import defaultdict
from typing import Callable
def aggregate_csv(
path: str,
group_by: str,
value_col: str,
filter_fn: Callable[[dict], bool] | None = None,
) -> dict[str, float]:
"""
CSV를 group_by 컬럼 기준으로 집계합니다.
"""
totals: dict[str, float] = defaultdict(float)
counts: dict[str, int] = defaultdict(int)
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if filter_fn and not filter_fn(row):
continue
key = row[group_by]
try:
totals[key] += float(row[value_col])
counts[key] += 1
except (ValueError, KeyError):
pass
return dict(totals)
# 사용 예시
sales = aggregate_csv(
"sales.csv",
group_by="region",
value_col="amount",
filter_fn=lambda r: r.get("status") == "completed",
)
for region, total in sorted(sales.items(), key=lambda x: -x[1]):
print(f"{region}: {total:,.0f}원")
실전 예제 3 — CSV ↔ JSON 변환
import csv
import json
from pathlib import Path
def csv_to_json(
csv_path: str | Path,
json_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""CSV 파일을 JSON으로 변환합니다."""
records = []
with open(csv_path, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(dict(row))
with open(json_path, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
return len(records)
def json_to_csv(
json_path: str | Path,
csv_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""JSON 파일을 CSV로 변환합니다."""
with open(json_path, "r", encoding="utf-8") as f:
records: list[dict] = json.load(f)
if not records:
return 0
with open(csv_path, "w", encoding=encoding, newline="") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)
return len(records)
고수 팁
팁 1 — BOM이 있는 UTF-8 파일 처리 (Windows Excel)
Windows Excel이 생성한 CSV는 BOM(Byte Order Mark)이 있을 수 있습니다.
# utf-8-sig: BOM 자동 처리
with open("excel_export.csv", "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
팁 2 — pandas 없이 대용량 CSV 집계
import csv
from collections import Counter
def count_by_column(path: str, column: str) -> Counter:
counts: Counter = Counter()
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
counts[row[column]] += 1
return counts
팁 3 — csv.writer의 lineterminator
Windows에서 Excel과 호환되는 CSV 생성 시 줄바꿈을 명시합니다.
writer = csv.writer(f, lineterminator="\r\n") # Windows CRLF
writer = csv.writer(f, lineterminator="\n") # Unix LF
팁 4 — 타입 변환 자동화
import csv
TYPE_MAP = {
"id": int,
"age": int,
"score": float,
"active": lambda x: x.lower() in ("true", "1", "yes"),
}
def typed_reader(path: str, type_map: dict):
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
yield {
k: type_map[k](v) if k in type_map and v else v
for k, v in row.items()
}