Skip to main content

Ch14 Pro Tips — Completing the DB Layer

Database integration code is both a key performance bottleneck and a source of bugs in applications. We summarize advanced patterns and precautions you must know when writing Go DB code for production.


Connection Pool Tuning

Key Configuration Formula

func setupProductionDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}

// Empirical Formula:
// MaxOpenConns = Number of CPU cores * 2 to 4 (General OLTP)
// = runtime.NumCPU() * 4
db.SetMaxOpenConns(runtime.NumCPU() * 4)

// MaxIdleConns = 50~100% of MaxOpenConns
db.SetMaxIdleConns(runtime.NumCPU() * 4)

// ConnMaxLifetime: Set shorter than DB server wait_timeout
// MySQL wait_timeout default 8 hours → set to 5 minutes
db.SetConnMaxLifetime(5 * time.Minute)

// Do not keep idle connections too long
db.SetConnMaxIdleTime(2 * time.Minute)

return db, db.Ping()
}

Monitoring Connection Pool Status

func monitorDBPool(db *sql.DB) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for range ticker.C {
stats := db.Stats()
slog.Info("DB Pool Status",
"open_connections", stats.OpenConnections,
"in_use", stats.InUse,
"idle", stats.Idle,
"wait_count", stats.WaitCount,
"wait_duration_ms", stats.WaitDuration.Milliseconds(),
"max_idle_closed", stats.MaxIdleClosed,
"max_lifetime_closed", stats.MaxLifetimeClosed,
)

// Consider increasing MaxOpenConns if WaitCount consistently increases
if stats.WaitCount > 100 {
slog.Warn("Excessive DB connection wait - consider adjusting pool size")
}
}
}

Solving N+1 Query Problems

N+1 Problem Pattern

// ❌ N+1 Query Problem: query users 1 time + query orders N times
func getUsersWithOrdersNPlusOne(ctx context.Context, db *sql.DB) {
users, _ := getAllUsers(ctx, db) // Query 1 time
for _, user := range users {
orders, _ := getOrdersByUserID(ctx, db, user.ID) // N additional queries!
fmt.Printf("%s: %d orders\n", user.Name, len(orders))
}
}

// ✅ Solution 1: Use JOIN query
func getUsersWithOrdersJoin(ctx context.Context, db *sql.DB) {
query := `
SELECT u.id, u.name, o.id AS order_id, o.amount
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
ORDER BY u.id
`
rows, _ := db.QueryContext(ctx, query)
defer rows.Close()

type UserOrder struct {
UserID int
Name string
OrderID sql.NullInt64
Amount sql.NullFloat64
}

var results []UserOrder
for rows.Next() {
var uo UserOrder
rows.Scan(&uo.UserID, &uo.Name, &uo.OrderID, &uo.Amount)
results = append(results, uo)
}
}

// ✅ Solution 2: Batch query with IN clause (commonly used in sqlc)
func getUserOrdersBatch(ctx context.Context, db *sql.DB, userIDs []int) map[int][]Order {
if len(userIDs) == 0 {
return nil
}

// Use IN clause with pq.Array (PostgreSQL)
query := `SELECT user_id, id, amount FROM orders WHERE user_id = ANY($1)`
rows, _ := db.QueryContext(ctx, query, pq.Array(userIDs))
defer rows.Close()

result := make(map[int][]Order)
for rows.Next() {
var o Order
rows.Scan(&o.UserID, &o.ID, &o.Amount)
result[o.UserID] = append(result[o.UserID], o)
}
return result
}

DB Layer Testing Strategy

Testing with Real DB using testcontainers

package db_test

import (
"context"
"database/sql"
"testing"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)

func setupTestDB(t *testing.T) *sql.DB {
t.Helper()
ctx := context.Background()

container, err := postgres.Run(ctx,
"docker.io/postgres:16-alpine",
postgres.WithDatabase("testdb"),
postgres.WithUsername("testuser"),
postgres.WithPassword("testpass"),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(2),
),
)
if err != nil {
t.Fatalf("failed to start PostgreSQL container: %v", err)
}
t.Cleanup(func() { container.Terminate(ctx) })

dsn, _ := container.ConnectionString(ctx, "sslmode=disable")
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatalf("failed to connect to DB: %v", err)
}
t.Cleanup(func() { db.Close() })

// Apply migrations
runMigrations(t, db)
return db
}

func TestUserRepository_Create(t *testing.T) {
db := setupTestDB(t)
repo := NewUserRepository(db)
ctx := context.Background()

user, err := repo.Create(ctx, "Test User", "test@example.com", 25)
if err != nil {
t.Fatalf("failed to create user: %v", err)
}

if user.ID == 0 {
t.Error("ID should not be 0")
}
if user.Name != "Test User" {
t.Errorf("name mismatch: want=%s, got=%s", "Test User", user.Name)
}
}

Test Isolation with Transaction Rollback

func TestWithTransaction(t *testing.T) {
db := setupTestDB(t)
ctx := context.Background()

// Start transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
t.Fatal(err)
}
// Always rollback after test (isolation without DB cleanup)
defer tx.Rollback()

// Test within transaction
repo := NewUserRepository(tx) // *sql.Tx is also queryable
user, _ := repo.Create(ctx, "Temp User", "temp@example.com", 20)

found, _ := repo.FindByID(ctx, user.ID)
if found == nil {
t.Error("could not find created user")
}

// defer tx.Rollback() automatically cleans up test data
}

Advanced Error Handling Patterns

Handling PostgreSQL Error Codes

import "github.com/lib/pq"

func createUserSafe(ctx context.Context, db *sql.DB, email string) error {
_, err := db.ExecContext(ctx, `INSERT INTO users (email) VALUES ($1)`, email)
if err != nil {
if pqErr, ok := err.(*pq.Error); ok {
switch pqErr.Code {
case "23505": // unique_violation
return fmt.Errorf("email '%s' is already in use", email)
case "23503": // foreign_key_violation
return fmt.Errorf("referential integrity violation: %v", pqErr.Detail)
case "23514": // check_violation
return fmt.Errorf("constraint violation: %v", pqErr.Constraint)
}
}
return fmt.Errorf("failed to create user: %w", err)
}
return nil
}

Applying Context Timeout

func getUserWithTimeout(db *sql.DB, id int) (*User, error) {
// 2-second query timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

var user User
err := db.QueryRowContext(ctx, `SELECT id, name FROM users WHERE id = $1`, id).
Scan(&user.ID, &user.Name)

if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return nil, fmt.Errorf("query timed out (exceeded 2s)")
}
return nil, err
}
return &user, nil
}

GORM Performance Advanced Tips

// ✅ Use Select to query specific columns only (avoid SELECT *)
db.Select("id", "name", "email").Find(&users)

// ✅ Use Rows() for large queries
rows, _ := db.Model(&User{}).Where("active = ?", true).Rows()
defer rows.Close()
for rows.Next() {
var user User
db.ScanRows(rows, &user)
processUser(user) // Streaming without full loading in memory
}

// ✅ Index hints (if necessary)
db.Raw("SELECT /*+ INDEX(users idx_users_email) */ * FROM users WHERE email = ?", email).Scan(&user)

// ✅ Upsert (Insert or Update)
db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "email"}},
DoUpdates: clause.AssignmentColumns([]string{"name", "updated_at"}),
}).Create(&user)

Real-world DB Architecture Patterns

Read/Write Splitting

type DBCluster struct {
primary *sql.DB // For writing
replica *sql.DB // For reading
}

func (c *DBCluster) Write() *sql.DB { return c.primary }
func (c *DBCluster) Read() *sql.DB { return c.replica }

type UserRepository struct {
db *DBCluster
}

func (r *UserRepository) Create(ctx context.Context, user *User) error {
// Write via primary
return r.db.Write().QueryRowContext(ctx, ...).Scan(...)
}

func (r *UserRepository) FindAll(ctx context.Context) ([]User, error) {
// Read via replica
return queryUsers(ctx, r.db.Read(), ...)
}

Defining Repository Interface (Ease of Testing)

type UserRepository interface {
Create(ctx context.Context, name, email string) (*User, error)
FindByID(ctx context.Context, id int) (*User, error)
FindAll(ctx context.Context) ([]User, error)
Update(ctx context.Context, id int, name string) error
Delete(ctx context.Context, id int) error
}

// Actual implementation
type postgresUserRepo struct{ db *sql.DB }

// Mock implementation for testing
type mockUserRepo struct {
users map[int]*User
}

Quick Reference Checklist

Production DB Code Checklist:

  • Pass Context to all queries (QueryContext, ExecContext)
  • Call rows.Close() with defer
  • Check rows.Err() after the loop ends
  • Configure connection pool (SetMaxOpenConns, SetMaxIdleConns, SetConnMaxLifetime)
  • Prevent SQL Injection with parameter binding (strictly no string formatting)
  • Transactions: defer tx.Rollback() + tx.Commit() at the end
  • Wrap DB errors as business errors before returning (%w wrapping)
  • Use streaming for large queries (avoid loading everything into memory)
  • Write migrations with rollback (down) scripts