본문으로 건너뛰기

Ch14 실전 고수 팁 — DB 레이어 완성

데이터베이스 연동 코드는 애플리케이션의 핵심 성능 병목이자 버그 발생 지점입니다. 현업에서 Go DB 코드를 작성할 때 반드시 알아야 할 고급 패턴과 주의사항을 정리합니다.


커넥션 풀 튜닝

핵심 설정 공식

func setupProductionDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}

// 경험적 공식:
// MaxOpenConns = CPU 코어 수 * 2 ~ 4 (일반 OLTP)
// = 런타임.NumCPU() * 4
db.SetMaxOpenConns(runtime.NumCPU() * 4)

// MaxIdleConns = MaxOpenConns의 50~100%
db.SetMaxIdleConns(runtime.NumCPU() * 4)

// ConnMaxLifetime: DB 서버 wait_timeout보다 짧게 설정
// MySQL wait_timeout 기본 8시간 → 5분으로 설정
db.SetConnMaxLifetime(5 * time.Minute)

// 유휴 커넥션도 너무 오래 유지하지 않음
db.SetConnMaxIdleTime(2 * time.Minute)

return db, db.Ping()
}

커넥션 풀 상태 모니터링

func monitorDBPool(db *sql.DB) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for range ticker.C {
stats := db.Stats()
slog.Info("DB 풀 상태",
"open_connections", stats.OpenConnections,
"in_use", stats.InUse,
"idle", stats.Idle,
"wait_count", stats.WaitCount,
"wait_duration_ms", stats.WaitDuration.Milliseconds(),
"max_idle_closed", stats.MaxIdleClosed,
"max_lifetime_closed", stats.MaxLifetimeClosed,
)

// WaitCount가 지속적으로 증가하면 MaxOpenConns 증가 고려
if stats.WaitCount > 100 {
slog.Warn("DB 커넥션 대기 과다 - 풀 크기 조정 검토")
}
}
}

N+1 쿼리 문제 해결

N+1 문제 패턴

// ❌ N+1 쿼리 문제: users 1번 + N번 orders 조회
func getUsersWithOrdersNPlusOne(ctx context.Context, db *sql.DB) {
users, _ := getAllUsers(ctx, db) // 쿼리 1번
for _, user := range users {
orders, _ := getOrdersByUserID(ctx, db, user.ID) // N번 추가 쿼리!
fmt.Printf("%s: %d개 주문\n", user.Name, len(orders))
}
}

// ✅ 해결책 1: JOIN 쿼리 사용
func getUsersWithOrdersJoin(ctx context.Context, db *sql.DB) {
query := `
SELECT u.id, u.name, o.id AS order_id, o.amount
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
ORDER BY u.id
`
rows, _ := db.QueryContext(ctx, query)
defer rows.Close()

type UserOrder struct {
UserID int
Name string
OrderID sql.NullInt64
Amount sql.NullFloat64
}

var results []UserOrder
for rows.Next() {
var uo UserOrder
rows.Scan(&uo.UserID, &uo.Name, &uo.OrderID, &uo.Amount)
results = append(results, uo)
}
}

// ✅ 해결책 2: IN 절로 배치 조회 (sqlc에서 주로 사용)
func getUserOrdersBatch(ctx context.Context, db *sql.DB, userIDs []int) map[int][]Order {
if len(userIDs) == 0 {
return nil
}

// pq.Array로 IN 절 사용 (PostgreSQL)
query := `SELECT user_id, id, amount FROM orders WHERE user_id = ANY($1)`
rows, _ := db.QueryContext(ctx, query, pq.Array(userIDs))
defer rows.Close()

result := make(map[int][]Order)
for rows.Next() {
var o Order
rows.Scan(&o.UserID, &o.ID, &o.Amount)
result[o.UserID] = append(result[o.UserID], o)
}
return result
}

DB 레이어 테스트 전략

testcontainers로 실제 DB 테스트

package db_test

import (
"context"
"database/sql"
"testing"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)

func setupTestDB(t *testing.T) *sql.DB {
t.Helper()
ctx := context.Background()

container, err := postgres.Run(ctx,
"docker.io/postgres:16-alpine",
postgres.WithDatabase("testdb"),
postgres.WithUsername("testuser"),
postgres.WithPassword("testpass"),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(2),
),
)
if err != nil {
t.Fatalf("PostgreSQL 컨테이너 시작 실패: %v", err)
}
t.Cleanup(func() { container.Terminate(ctx) })

dsn, _ := container.ConnectionString(ctx, "sslmode=disable")
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatalf("DB 연결 실패: %v", err)
}
t.Cleanup(func() { db.Close() })

// 마이그레이션 적용
runMigrations(t, db)
return db
}

func TestUserRepository_Create(t *testing.T) {
db := setupTestDB(t)
repo := NewUserRepository(db)
ctx := context.Background()

user, err := repo.Create(ctx, "테스트", "test@example.com", 25)
if err != nil {
t.Fatalf("사용자 생성 실패: %v", err)
}

if user.ID == 0 {
t.Error("ID가 0이어서는 안 됨")
}
if user.Name != "테스트" {
t.Errorf("이름 불일치: 원하는=%s, 실제=%s", "테스트", user.Name)
}
}

트랜잭션 롤백으로 테스트 격리

func TestWithTransaction(t *testing.T) {
db := setupTestDB(t)
ctx := context.Background()

// 트랜잭션 시작
tx, err := db.BeginTx(ctx, nil)
if err != nil {
t.Fatal(err)
}
// 테스트 후 항상 롤백 (DB 초기화 없이 격리)
defer tx.Rollback()

// 트랜잭션 내에서 테스트
repo := NewUserRepository(tx) // *sql.Tx도 쿼리 가능
user, _ := repo.Create(ctx, "임시유저", "temp@example.com", 20)

found, _ := repo.FindByID(ctx, user.ID)
if found == nil {
t.Error("생성된 사용자를 찾을 수 없음")
}

// defer tx.Rollback()으로 테스트 데이터 자동 정리
}

에러 처리 고급 패턴

PostgreSQL 에러 코드 처리

import "github.com/lib/pq"

func createUserSafe(ctx context.Context, db *sql.DB, email string) error {
_, err := db.ExecContext(ctx, `INSERT INTO users (email) VALUES ($1)`, email)
if err != nil {
if pqErr, ok := err.(*pq.Error); ok {
switch pqErr.Code {
case "23505": // unique_violation
return fmt.Errorf("이메일 '%s'는 이미 사용 중입니다", email)
case "23503": // foreign_key_violation
return fmt.Errorf("참조 무결성 위반: %v", pqErr.Detail)
case "23514": // check_violation
return fmt.Errorf("제약 조건 위반: %v", pqErr.Constraint)
}
}
return fmt.Errorf("사용자 생성 실패: %w", err)
}
return nil
}

Context 타임아웃 적용

func getUserWithTimeout(db *sql.DB, id int) (*User, error) {
// 쿼리 타임아웃 2초
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

var user User
err := db.QueryRowContext(ctx, `SELECT id, name FROM users WHERE id = $1`, id).
Scan(&user.ID, &user.Name)

if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return nil, fmt.Errorf("쿼리 타임아웃 (2s 초과)")
}
return nil, err
}
return &user, nil
}

GORM 성능 고급 팁

// ✅ Select로 필요한 컬럼만 조회 (SELECT *)은 피할 것
db.Select("id", "name", "email").Find(&users)

// ✅ 대용량 조회 시 Rows() 사용
rows, _ := db.Model(&User{}).Where("active = ?", true).Rows()
defer rows.Close()
for rows.Next() {
var user User
db.ScanRows(rows, &user)
processUser(user) // 메모리에 전체 로딩 없이 스트리밍 처리
}

// ✅ 인덱스 힌트 (필요시)
db.Raw("SELECT /*+ INDEX(users idx_users_email) */ * FROM users WHERE email = ?", email).Scan(&user)

// ✅ Upsert (Insert or Update)
db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "email"}},
DoUpdates: clause.AssignmentColumns([]string{"name", "updated_at"}),
}).Create(&user)

실전 DB 아키텍처 패턴

Read/Write 분리

type DBCluster struct {
primary *sql.DB // 쓰기용
replica *sql.DB // 읽기용
}

func (c *DBCluster) Write() *sql.DB { return c.primary }
func (c *DBCluster) Read() *sql.DB { return c.replica }

type UserRepository struct {
db *DBCluster
}

func (r *UserRepository) Create(ctx context.Context, user *User) error {
// 쓰기는 primary
return r.db.Write().QueryRowContext(ctx, ...).Scan(...)
}

func (r *UserRepository) FindAll(ctx context.Context) ([]User, error) {
// 읽기는 replica
return queryUsers(ctx, r.db.Read(), ...)
}

Repository 인터페이스 정의 (테스트 용이성)

type UserRepository interface {
Create(ctx context.Context, name, email string) (*User, error)
FindByID(ctx context.Context, id int) (*User, error)
FindAll(ctx context.Context) ([]User, error)
Update(ctx context.Context, id int, name string) error
Delete(ctx context.Context, id int) error
}

// 실제 구현
type postgresUserRepo struct{ db *sql.DB }

// 테스트용 모의 구현
type mockUserRepo struct {
users map[int]*User
}

빠른 참고 체크리스트

프로덕션 DB 코드 점검 목록:

  • 모든 쿼리에 Context 전달 (QueryContext, ExecContext)
  • rows.Close() defer 호출
  • rows.Err() 루프 종료 후 확인
  • 커넥션 풀 설정 (SetMaxOpenConns, SetMaxIdleConns, SetConnMaxLifetime)
  • 파라미터 바인딩으로 SQL Injection 방지 (문자열 포맷팅 절대 금지)
  • 트랜잭션은 defer tx.Rollback() + 마지막에 tx.Commit()
  • DB 에러는 비즈니스 에러로 변환 후 반환 (%w 래핑)
  • 대용량 조회는 스트리밍 처리 (전체 메모리 로딩 지양)
  • 마이그레이션은 롤백(down) 스크립트와 함께 작성