본문으로 건너뛰기
Advertisement

메모리 효율적 데이터 처리

대용량 데이터를 처리할 때는 전체 데이터를 메모리에 올리지 않고 스트리밍 방식으로 처리하는 것이 핵심입니다. 10GB 파일도 일정한 메모리만 사용하여 처리할 수 있습니다.


제너레이터로 대용량 파일 처리

일반 방법은 파일 전체를 메모리에 올리지만, 제너레이터는 한 줄씩 처리하여 메모리를 일정하게 유지합니다.

from pathlib import Path


# ❌ 메모리 비효율 — 전체 파일을 메모리에 로드
def count_lines_bad(filepath: str) -> int:
with open(filepath, "r", encoding="utf-8") as f:
lines = f.readlines() # 수백 MB 또는 수 GB가 메모리에 올라감
return len(lines)


# ✅ 메모리 효율 — 한 줄씩 스트리밍
def count_lines_good(filepath: str) -> int:
count = 0
with open(filepath, "r", encoding="utf-8") as f:
for _ in f: # 파일 객체는 이터레이터 — 한 줄씩 읽음
count += 1
return count


# ✅ 제너레이터 파이프라인 — 여러 처리 단계를 지연 평가로 연결
def read_lines(filepath: str, encoding: str = "utf-8"):
"""파일을 한 줄씩 yield"""
with open(filepath, "r", encoding=encoding) as f:
for line in f:
yield line.rstrip("\n")


def filter_empty(lines):
"""빈 줄 제거 — 다음 제너레이터에 전달"""
for line in lines:
if line.strip():
yield line


def parse_log_line(lines):
"""로그 라인 파싱 — 구조체로 변환"""
import re
pattern = r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.*)"
for line in lines:
match = re.match(pattern, line)
if match:
yield {
"timestamp": match.group(1),
"level": match.group(2),
"message": match.group(3),
}


def filter_errors(records):
"""ERROR 레벨만 통과"""
for record in records:
if record["level"] in ("ERROR", "CRITICAL"):
yield record


# 파이프라인 조립 — 메모리에는 현재 처리 중인 레코드 하나만 존재
def process_log_file(filepath: str) -> list[dict]:
pipeline = (
filter_errors(
parse_log_line(
filter_empty(
read_lines(filepath)
)
)
)
)

# 에러만 수집 (전체 파일이 아닌 에러 레코드만 메모리에 올라감)
return list(pipeline)


# 테스트용 로그 파일 생성
import tempfile
import os

log_content = """\
2024-01-15 10:00:01 [INFO] 서버 시작
2024-01-15 10:00:02 [DEBUG] DB 연결 확인
2024-01-15 10:01:05 [ERROR] DB 쿼리 타임아웃
2024-01-15 10:01:10 [INFO] 재연결 시도
2024-01-15 10:01:15 [ERROR] 재연결 실패

2024-01-15 10:02:00 [CRITICAL] 서비스 중단
2024-01-15 10:05:00 [INFO] 복구 완료
"""

with tempfile.NamedTemporaryFile(mode="w", suffix=".log",
delete=False, encoding="utf-8") as f:
f.write(log_content)
log_path = f.name

errors = process_log_file(log_path)
print(f"에러 레코드: {len(errors)}개")
for err in errors:
print(f" [{err['level']}] {err['timestamp']}: {err['message']}")

os.unlink(log_path)

청크 단위 CSV 처리

import csv
from typing import Generator, Iterator
from pathlib import Path
import io


def read_csv_chunks(
filepath: str,
chunk_size: int = 1000,
encoding: str = "utf-8",
) -> Generator[list[dict], None, None]:
"""CSV를 청크 단위로 읽는 제너레이터"""
with open(filepath, "r", encoding=encoding, newline="") as f:
reader = csv.DictReader(f)
chunk: list[dict] = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk: # 마지막 청크 (chunk_size보다 작을 수 있음)
yield chunk


def process_chunk(chunk: list[dict]) -> dict:
"""청크 단위 집계 처리"""
total = 0
count = 0
errors = 0

for row in chunk:
try:
amount = float(row.get("amount", 0))
total += amount
count += 1
except (ValueError, KeyError):
errors += 1

return {"total": total, "count": count, "errors": errors}


def aggregate_csv(filepath: str, chunk_size: int = 10000) -> dict:
"""대용량 CSV 집계 — 청크 단위로 처리하여 메모리 일정 유지"""
grand_total = 0.0
grand_count = 0
grand_errors = 0

for i, chunk in enumerate(read_csv_chunks(filepath, chunk_size), 1):
result = process_chunk(chunk)
grand_total += result["total"]
grand_count += result["count"]
grand_errors += result["errors"]

if i % 10 == 0:
print(f" 처리 중: {grand_count:,}행 완료...")

return {
"total": grand_total,
"count": grand_count,
"errors": grand_errors,
"average": grand_total / grand_count if grand_count else 0,
}


# 테스트용 대용량 CSV 생성
import random
import tempfile

csv_rows = ["id,name,amount,category"]
for i in range(50000):
csv_rows.append(
f"{i},{f'item_{i}'},"
f"{random.uniform(100, 100000):.2f},"
f"{random.choice(['A', 'B', 'C'])}"
)

with tempfile.NamedTemporaryFile(mode="w", suffix=".csv",
delete=False, encoding="utf-8") as f:
f.write("\n".join(csv_rows))
csv_path = f.name

result = aggregate_csv(csv_path, chunk_size=5000)
print(f"총계: {result['total']:,.2f}")
print(f"행 수: {result['count']:,}")
print(f"평균: {result['average']:,.2f}")

os.unlink(csv_path)

청크 단위 JSON 처리

import json
from typing import Generator


def stream_json_array(filepath: str) -> Generator[dict, None, None]:
"""JSON 배열 파일을 스트리밍 방식으로 읽기 (ijson 미사용)"""
with open(filepath, "r", encoding="utf-8") as f:
content = f.read(1) # '['
assert content == "[", "JSON 배열이어야 합니다."

depth = 0
buffer = ""
in_string = False
escape_next = False

for char in f:
for c in char:
if escape_next:
buffer += c
escape_next = False
continue

if c == "\\" and in_string:
buffer += c
escape_next = True
continue

if c == '"':
in_string = not in_string

if not in_string:
if c == "{":
depth += 1
buffer += c
elif c == "}":
depth -= 1
buffer += c
if depth == 0:
yield json.loads(buffer.strip())
buffer = ""
elif c in (",", "]", "\n", " ", "\t") and depth == 0:
pass
else:
buffer += c
else:
buffer += c


# ijson을 사용한 스트리밍 (pip install ijson)
def stream_json_with_ijson(filepath: str, prefix: str = "item") -> Generator:
"""ijson으로 대용량 JSON 스트리밍"""
try:
import ijson
with open(filepath, "rb") as f:
for item in ijson.items(f, prefix):
yield item
except ImportError:
print("ijson 미설치: pip install ijson")
# 폴백: 전체 로드
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
for item in data:
yield item


# 대용량 JSON 생성 및 처리 예시
import tempfile

json_data = [{"id": i, "value": i * 2.5, "label": f"item_{i}"}
for i in range(100)]

with tempfile.NamedTemporaryFile(mode="w", suffix=".json",
delete=False, encoding="utf-8") as f:
json.dump(json_data, f)
json_path = f.name

# 스트리밍 처리
total = sum(item["value"] for item in stream_json_array(json_path))
print(f"JSON 스트리밍 총합: {total:.1f}")

os.unlink(json_path)

memory_profiler로 메모리 사용량 측정

# pip install memory-profiler
# 사용법: @profile 데코레이터를 붙이고 python -m memory_profiler script.py 로 실행

# 코드 예시 (실제 실행은 memory_profiler 설치 후 터미널에서)
"""
from memory_profiler import profile

@profile
def load_all_at_once(n: int) -> int:
# 메모리 비효율: 전체 리스트를 메모리에 올림
data = list(range(n))
return sum(data)

@profile
def stream_process(n: int) -> int:
# 메모리 효율: 제너레이터 사용
return sum(range(n))

load_all_at_once(10_000_000)
stream_process(10_000_000)
"""

# 메모리 사용량을 코드 내에서 측정
from memory_profiler import memory_usage

def load_bad():
return list(range(5_000_000))

def load_good():
return sum(range(5_000_000))

try:
mem_bad = memory_usage((load_bad, ), max_usage=True)
mem_good = memory_usage((load_good, ), max_usage=True)
print(f"리스트 방식: {mem_bad:.1f} MB")
print(f"제너레이터 방식: {mem_good:.1f} MB")
except ImportError:
print("memory_profiler 미설치 — 대안 방법 사용")
import tracemalloc

tracemalloc.start()
_ = list(range(5_000_000))
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"리스트 방식 피크: {peak / 1024 / 1024:.1f} MB")

sys.getsizeof vs tracemalloc

import sys
import tracemalloc


# sys.getsizeof: 객체 자체의 메모리 크기 (얕은 크기)
lst = list(range(1000))
print(f"list 객체 자체: {sys.getsizeof(lst)} bytes") # ~8000 bytes (포인터 배열)
print(f"int 객체 하나: {sys.getsizeof(0)} bytes") # ~28 bytes
print(f"실제 총 크기: ~{sys.getsizeof(lst) + 1000 * sys.getsizeof(0)} bytes")
# sys.getsizeof는 포함된 객체의 크기를 포함하지 않음!

# 재귀적 크기 측정 (실제 사용 메모리)
def deep_sizeof(obj, seen=None) -> int:
"""객체의 실제 메모리 크기를 재귀적으로 계산"""
if seen is None:
seen = set()
obj_id = id(obj)
if obj_id in seen:
return 0
seen.add(obj_id)
size = sys.getsizeof(obj)

if isinstance(obj, dict):
size += sum(deep_sizeof(k, seen) + deep_sizeof(v, seen)
for k, v in obj.items())
elif isinstance(obj, (list, tuple, set, frozenset)):
size += sum(deep_sizeof(item, seen) for item in obj)

return size


data = {"name": "김철수", "scores": [95, 87, 92], "active": True}
print(f"\ndict 얕은 크기: {sys.getsizeof(data)} bytes")
print(f"dict 깊은 크기: {deep_sizeof(data)} bytes")


# tracemalloc: 코드 블록의 메모리 할당 추적
print("\n=== tracemalloc 측정 ===")

tracemalloc.start()

# 측정할 코드
snapshot_before = tracemalloc.take_snapshot()

large_list = [{"id": i, "data": "x" * 100} for i in range(10000)]

snapshot_after = tracemalloc.take_snapshot()

# 차이 분석
top_stats = snapshot_after.compare_to(snapshot_before, "lineno")
print("메모리 증가 상위 3개:")
for stat in top_stats[:3]:
print(f" {stat}")

current, peak = tracemalloc.get_traced_memory()
print(f"\n현재 사용: {current / 1024:.1f} KB")
print(f"피크 사용: {peak / 1024:.1f} KB")

tracemalloc.stop()

# 정리
del large_list

# 객체 크기 비교
items = {
"빈 list": [],
"100개 int list": list(range(100)),
"100개 str list": [str(i) for i in range(100)],
"100개 dict list": [{"k": i} for i in range(100)],
}

print("\n=== 객체 크기 비교 ===")
for name, obj in items.items():
print(f"{name}: {deep_sizeof(obj):,} bytes")

스트리밍 HTTP 응답 처리

# pip install requests
import requests
from pathlib import Path


def download_large_file(url: str, destination: str, chunk_size: int = 8192) -> Path:
"""대용량 파일을 청크 단위로 다운로드 — 전체를 메모리에 올리지 않음"""
dest_path = Path(destination)

with requests.get(url, stream=True, timeout=30) as response:
response.raise_for_status()

total_size = int(response.headers.get("content-length", 0))
downloaded = 0

with open(dest_path, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # 빈 청크 필터링
f.write(chunk)
downloaded += len(chunk)

if total_size:
pct = downloaded / total_size * 100
print(f"\r다운로드: {downloaded:,} / {total_size:,} bytes "
f"({pct:.1f}%)", end="", flush=True)

print(f"\n완료: {dest_path}")
return dest_path


def stream_json_api(url: str, params: dict | None = None) -> dict:
"""스트리밍으로 JSON API 응답 처리"""
with requests.get(url, params=params, stream=True, timeout=30) as response:
response.raise_for_status()

# 작은 JSON이면 한 번에 파싱
content_length = int(response.headers.get("content-length", 0))
if content_length < 10 * 1024 * 1024: # 10MB 미만
return response.json()

# 대용량이면 청크로 수집 후 파싱
import json
chunks = []
for chunk in response.iter_content(chunk_size=8192):
chunks.append(chunk)
return json.loads(b"".join(chunks))


def stream_lines_from_api(url: str) -> None:
"""스트리밍 API (Server-Sent Events 등)에서 줄 단위로 처리"""
with requests.get(url, stream=True, timeout=60) as response:
response.raise_for_status()
for line in response.iter_lines(chunk_size=1024):
if line:
decoded = line.decode("utf-8")
# 각 라인 처리
print(f"수신: {decoded}")

실전: 10GB 로그 파일 처리 패턴

import re
import os
import gzip
import tempfile
from pathlib import Path
from collections import defaultdict, Counter
from datetime import datetime
from typing import Generator


# 로그 라인 파싱 패턴
LOG_PATTERN = re.compile(
r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) "
r"\[(\w+)\] "
r"\[([^\]]+)\] "
r"(.*)"
)


def read_log_file(filepath: str) -> Generator[str, None, None]:
"""일반 파일과 gzip 파일 모두 지원하는 라인 제너레이터"""
path = Path(filepath)
if path.suffix == ".gz":
opener = gzip.open(filepath, "rt", encoding="utf-8", errors="replace")
else:
opener = open(filepath, "r", encoding="utf-8", errors="replace")

with opener as f:
for line in f:
yield line.rstrip("\n")


def parse_log_entry(line: str) -> dict | None:
"""로그 라인 파싱 — 파싱 실패 시 None 반환"""
match = LOG_PATTERN.match(line)
if not match:
return None

timestamp_str, level, module, message = match.groups()
try:
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S,%f")
except ValueError:
return None

return {
"timestamp": timestamp,
"level": level,
"module": module,
"message": message,
}


def analyze_log_streaming(filepath: str, report_interval: int = 100_000) -> dict:
"""
대용량 로그 파일 스트리밍 분석
- 전체 파일을 메모리에 올리지 않음
- 집계 결과만 메모리에 유지
"""
# 집계 데이터 (작음 — 메모리 안전)
level_counts = Counter()
module_counts = Counter()
error_messages = [] # 최대 N개만 보관
max_errors_to_keep = 100
total_lines = 0
parsed_lines = 0
first_timestamp = None
last_timestamp = None
hourly_stats = defaultdict(Counter)

print(f"로그 분석 시작: {filepath}")
start = datetime.now()

for line in read_log_file(filepath):
total_lines += 1

if total_lines % report_interval == 0:
elapsed = (datetime.now() - start).total_seconds()
rate = total_lines / elapsed if elapsed else 0
print(f" 진행: {total_lines:,}줄 처리 ({rate:,.0f} 줄/초)")

entry = parse_log_entry(line)
if entry is None:
continue

parsed_lines += 1
ts = entry["timestamp"]
level = entry["level"]
module = entry["module"]

# 시간 범위 추적
if first_timestamp is None or ts < first_timestamp:
first_timestamp = ts
if last_timestamp is None or ts > last_timestamp:
last_timestamp = ts

# 집계
level_counts[level] += 1
module_counts[module] += 1
hour_key = ts.strftime("%Y-%m-%d %H:00")
hourly_stats[hour_key][level] += 1

# 에러 메시지 샘플 수집 (최대 N개)
if level in ("ERROR", "CRITICAL") and len(error_messages) < max_errors_to_keep:
error_messages.append({
"timestamp": ts.isoformat(),
"level": level,
"module": module,
"message": entry["message"][:200], # 길이 제한
})

elapsed = (datetime.now() - start).total_seconds()

return {
"summary": {
"total_lines": total_lines,
"parsed_lines": parsed_lines,
"parse_rate": parsed_lines / total_lines if total_lines else 0,
"time_range": {
"from": first_timestamp.isoformat() if first_timestamp else None,
"to": last_timestamp.isoformat() if last_timestamp else None,
},
"elapsed_seconds": elapsed,
"lines_per_second": total_lines / elapsed if elapsed else 0,
},
"level_counts": dict(level_counts.most_common()),
"top_modules": dict(module_counts.most_common(10)),
"error_samples": error_messages[:10],
"hourly_stats": dict(sorted(hourly_stats.items())),
}


# 테스트용 대용량 로그 생성 및 분석
import random

log_lines = []
levels = ["DEBUG"] * 50 + ["INFO"] * 30 + ["WARNING"] * 12 + ["ERROR"] * 7 + ["CRITICAL"] * 1
modules = ["auth", "database", "api", "cache", "worker", "scheduler"]

for i in range(10000):
ts = f"2024-01-15 {10 + i // 3600:02d}:{(i % 3600) // 60:02d}:{i % 60:02d},000"
level = random.choice(levels)
module = random.choice(modules)
msg = f"처리 완료 {i}" if level in ("INFO", "DEBUG") else f"오류 발생: 코드 {i % 100}"
log_lines.append(f"{ts} [{level}] [{module}] {msg}")

with tempfile.NamedTemporaryFile(mode="w", suffix=".log",
delete=False, encoding="utf-8") as f:
f.write("\n".join(log_lines))
log_path = f.name

report = analyze_log_streaming(log_path, report_interval=2000)

print(f"\n=== 분석 리포트 ===")
print(f"총 {report['summary']['total_lines']:,}줄")
print(f"파싱 성공률: {report['summary']['parse_rate']:.1%}")
print(f"처리 속도: {report['summary']['lines_per_second']:,.0f} 줄/초")
print(f"\n레벨별 분포:")
for level, count in report["level_counts"].items():
bar = "█" * (count // 200)
print(f" {level:10s}: {count:6,} {bar}")

os.unlink(log_path)

고수 팁

1. itertools로 청크 나누기

import itertools
from typing import Generator, Iterable


def batched(iterable: Iterable, n: int) -> Generator:
"""이터러블을 n개씩 묶어서 반환 (Python 3.12+: itertools.batched)"""
it = iter(iterable)
while batch := list(itertools.islice(it, n)):
yield batch


# 대용량 쿼리 결과를 배치로 DB에 저장
def save_records_in_batches(records: Iterable[dict], batch_size: int = 1000) -> int:
total_saved = 0
for batch in batched(records, batch_size):
# db.bulk_insert(batch) # 실제 DB 저장
total_saved += len(batch)
print(f"배치 저장: {len(batch)}개 (누적: {total_saved:,}개)")
return total_saved


# 100만 레코드를 1000개씩 배치 처리
def generate_records(n: int):
for i in range(n):
yield {"id": i, "value": i * 2}

save_records_in_batches(generate_records(5000), batch_size=1000)

2. __slots__로 대량 인스턴스 메모리 절약

import sys


class LogEntryWithDict:
def __init__(self, timestamp, level, message):
self.timestamp = timestamp
self.level = level
self.message = message


class LogEntryWithSlots:
__slots__ = ("timestamp", "level", "message")

def __init__(self, timestamp, level, message):
self.timestamp = timestamp
self.level = level
self.message = message


e1 = LogEntryWithDict("2024-01-01", "INFO", "테스트")
e2 = LogEntryWithSlots("2024-01-01", "INFO", "테스트")

print(f"__dict__ 버전: {sys.getsizeof(e1) + sys.getsizeof(e1.__dict__)} bytes")
print(f"__slots__ 버전: {sys.getsizeof(e2)} bytes")

# 100만 개 인스턴스에서 차이 확인
N = 100_000
import tracemalloc

tracemalloc.start()
entries_dict = [LogEntryWithDict("2024-01-01", "INFO", f"메시지 {i}") for i in range(N)]
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"\n__dict__ {N:,}개: 피크 {peak / 1024 / 1024:.1f} MB")

del entries_dict
tracemalloc.start()
entries_slots = [LogEntryWithSlots("2024-01-01", "INFO", f"메시지 {i}") for i in range(N)]
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"__slots__ {N:,}개: 피크 {peak / 1024 / 1024:.1f} MB")

3. array 모듈로 동질 데이터 절약

import array
import sys


# Python 리스트: 각 요소가 Python 객체 (오버헤드 큼)
regular_list = list(range(1_000_000))

# array 모듈: C 배열 방식 (동일 타입만 저장, 메모리 효율적)
int_array = array.array("i", range(1_000_000)) # 'i' = signed int (4 bytes)
double_array = array.array("d", (float(i) for i in range(1_000_000))) # 'd' = double (8 bytes)

print(f"리스트 크기: {sys.getsizeof(regular_list):,} bytes")
print(f"array(int) 크기: {sys.getsizeof(int_array):,} bytes")
print(f"이론적 int 크기: {4 * 1_000_000:,} bytes (4 bytes × 100만)")

# NumPy가 있으면 더 강력함
try:
import numpy as np
np_array = np.arange(1_000_000, dtype=np.int32)
print(f"NumPy int32: {np_array.nbytes:,} bytes")
except ImportError:
pass

정리

기법메모리 효과사용 시기
제너레이터 파이프라인O(1) 메모리대용량 파일 처리
청크 단위 읽기청크 크기만큼CSV, JSON 배치 처리
tracemalloc메모리 사용량 측정
__slots__40~50% 절약대량 인스턴스 생성
array 모듈최대 90% 절약동질 숫자 데이터
스트리밍 HTTP고정 메모리대용량 파일 다운로드

핵심 원칙: 필요한 데이터만 메모리에 올려라. 전체를 보기 전에 필요한 것이 무엇인지 먼저 파악하라.

Advertisement