본문으로 건너뛰기
Advertisement

CSV 처리 — csv 모듈, DictReader/DictWriter, 대용량 처리

CSV(Comma-Separated Values)는 가장 보편적인 데이터 교환 형식입니다. Python의 csv 모듈은 복잡한 인용, 이스케이프 처리를 자동으로 해주며, DictReader/DictWriter로 헤더 기반의 직관적인 처리가 가능합니다.


csv 모듈 기본 — reader / writer

읽기

import csv

with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f)

# 헤더 읽기
header = next(reader)
print(f"컬럼: {header}")

# 데이터 읽기
for row in reader:
print(row) # row는 문자열 리스트

newline=""를 반드시 지정하십시오. CSV 파일의 줄바꿈 처리를 csv 모듈에 맡겨야 인용부호 안의 줄바꿈이 올바르게 처리됩니다.

쓰기

import csv

rows = [
["이름", "나이", "이메일"],
["Alice", 30, "alice@example.com"],
["Bob", 25, "bob@example.com"],
]

with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(rows[0]) # 단일 행
writer.writerows(rows[1:]) # 여러 행 한 번에

DictReader / DictWriter — 헤더 기반 처리

DictReader

각 행이 딕셔너리로 반환됩니다. 컬럼명을 키로 사용하므로 위치가 아닌 이름으로 접근할 수 있습니다.

import csv

with open("employees.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
# reader.fieldnames: 헤더 리스트 ['name', 'age', 'dept']

for row in reader:
# row는 dict: {'name': 'Alice', 'age': '30', 'dept': 'Engineering'}
name = row["name"]
age = int(row["age"]) # 모든 값은 문자열
print(f"{name}: {age}세")

헤더가 없는 파일에 이름을 지정할 수도 있습니다:

reader = csv.DictReader(f, fieldnames=["id", "name", "score"])

DictWriter

import csv

employees = [
{"name": "Alice", "age": 30, "dept": "Engineering"},
{"name": "Bob", "age": 25, "dept": "Marketing"},
]

fieldnames = ["name", "age", "dept"]

with open("output.csv", "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)

writer.writeheader() # 헤더 행 작성
writer.writerows(employees) # 데이터 행들 작성

# extrasaction='ignore': 딕셔너리에 여분 키가 있어도 무시
# extrasaction='raise' (기본값): 여분 키 있으면 ValueError

csv.Sniffer — 구분자 자동 감지

import csv

def auto_detect_dialect(path: str) -> csv.Dialect:
"""CSV 파일의 구분자와 형식을 자동 감지합니다."""
sniffer = csv.Sniffer()
with open(path, "r", encoding="utf-8") as f:
sample = f.read(4096)
dialect = sniffer.sniff(sample)
has_header = sniffer.has_header(sample)
print(f"구분자: {dialect.delimiter!r}, 헤더 있음: {has_header}")
return dialect

# 감지된 dialect 사용
with open("unknown.csv", "r", encoding="utf-8", newline="") as f:
dialect = auto_detect_dialect("unknown.csv")
reader = csv.reader(f, dialect)
for row in reader:
print(row)

Dialect 커스터마이징

import csv

# 탭 구분자 (TSV)
with open("data.tsv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter="\t")

# 세미콜론 구분자 (유럽 Excel)
with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, delimiter=";", quotechar='"')

# 커스텀 Dialect 등록
csv.register_dialect(
"pipe_delimited",
delimiter="|",
quotechar="'",
quoting=csv.QUOTE_MINIMAL,
skipinitialspace=True,
)

with open("data.csv", "r", encoding="utf-8", newline="") as f:
reader = csv.reader(f, dialect="pipe_delimited")

quoting 옵션

상수의미
csv.QUOTE_MINIMAL필요한 경우만 인용 (기본값)
csv.QUOTE_ALL모든 필드 인용
csv.QUOTE_NONNUMERIC숫자가 아닌 필드 인용
csv.QUOTE_NONE인용 없음

대용량 CSV 스트리밍 처리

import csv
from typing import Iterator

def stream_csv(
path: str,
chunk_size: int = 1000,
) -> Iterator[list[dict]]:
"""
대용량 CSV를 청크 단위로 스트리밍합니다.
메모리에 전체 파일을 올리지 않습니다.
"""
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
chunk = []
for i, row in enumerate(reader):
chunk.append(row)
if (i + 1) % chunk_size == 0:
yield chunk
chunk = []
if chunk:
yield chunk

# 사용
for chunk in stream_csv("large_data.csv", chunk_size=500):
process_batch(chunk)

실전 예제 1 — 데이터 변환

import csv
from datetime import datetime

def normalize_sales_data(
input_path: str,
output_path: str,
) -> dict:
"""
판매 데이터 CSV를 정규화합니다.
- 날짜 형식 통일 (YYYY-MM-DD)
- 금액 앞 통화 기호 제거
- 빈 값 → None
"""
stats = {"processed": 0, "skipped": 0}
output_rows = []

with open(input_path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
try:
# 날짜 정규화
for fmt in ("%Y/%m/%d", "%d-%m-%Y", "%m/%d/%Y"):
try:
dt = datetime.strptime(row["date"], fmt)
row["date"] = dt.strftime("%Y-%m-%d")
break
except ValueError:
continue

# 금액 정리
amount_str = row["amount"].replace(",", "").lstrip("$₩€")
row["amount"] = float(amount_str) if amount_str else None

# 빈 값 처리
row = {k: (v.strip() or None) for k, v in row.items()}
output_rows.append(row)
stats["processed"] += 1
except (ValueError, KeyError) as e:
print(f"행 건너뜀 ({e}): {row}")
stats["skipped"] += 1

if output_rows:
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=output_rows[0].keys())
writer.writeheader()
writer.writerows(output_rows)

return stats

실전 예제 2 — 필터링과 집계

import csv
from collections import defaultdict
from typing import Callable

def aggregate_csv(
path: str,
group_by: str,
value_col: str,
filter_fn: Callable[[dict], bool] | None = None,
) -> dict[str, float]:
"""
CSV를 group_by 컬럼 기준으로 집계합니다.
"""
totals: dict[str, float] = defaultdict(float)
counts: dict[str, int] = defaultdict(int)

with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if filter_fn and not filter_fn(row):
continue
key = row[group_by]
try:
totals[key] += float(row[value_col])
counts[key] += 1
except (ValueError, KeyError):
pass

return dict(totals)

# 사용 예시
sales = aggregate_csv(
"sales.csv",
group_by="region",
value_col="amount",
filter_fn=lambda r: r.get("status") == "completed",
)
for region, total in sorted(sales.items(), key=lambda x: -x[1]):
print(f"{region}: {total:,.0f}원")

실전 예제 3 — CSV ↔ JSON 변환

import csv
import json
from pathlib import Path

def csv_to_json(
csv_path: str | Path,
json_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""CSV 파일을 JSON으로 변환합니다."""
records = []
with open(csv_path, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
records.append(dict(row))

with open(json_path, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)

return len(records)


def json_to_csv(
json_path: str | Path,
csv_path: str | Path,
encoding: str = "utf-8",
) -> int:
"""JSON 파일을 CSV로 변환합니다."""
with open(json_path, "r", encoding="utf-8") as f:
records: list[dict] = json.load(f)

if not records:
return 0

with open(csv_path, "w", encoding=encoding, newline="") as f:
writer = csv.DictWriter(f, fieldnames=records[0].keys())
writer.writeheader()
writer.writerows(records)

return len(records)

고수 팁

팁 1 — BOM이 있는 UTF-8 파일 처리 (Windows Excel)

Windows Excel이 생성한 CSV는 BOM(Byte Order Mark)이 있을 수 있습니다.

# utf-8-sig: BOM 자동 처리
with open("excel_export.csv", "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)

팁 2 — pandas 없이 대용량 CSV 집계

import csv
from collections import Counter

def count_by_column(path: str, column: str) -> Counter:
counts: Counter = Counter()
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
counts[row[column]] += 1
return counts

팁 3 — csv.writer의 lineterminator

Windows에서 Excel과 호환되는 CSV 생성 시 줄바꿈을 명시합니다.

writer = csv.writer(f, lineterminator="\r\n")  # Windows CRLF
writer = csv.writer(f, lineterminator="\n") # Unix LF

팁 4 — 타입 변환 자동화

import csv

TYPE_MAP = {
"id": int,
"age": int,
"score": float,
"active": lambda x: x.lower() in ("true", "1", "yes"),
}

def typed_reader(path: str, type_map: dict):
with open(path, "r", encoding="utf-8", newline="") as f:
for row in csv.DictReader(f):
yield {
k: type_map[k](v) if k in type_map and v else v
for k, v in row.items()
}
Advertisement