Ch14 Pro Tips — Completing the DB Layer
Database integration code is both a key performance bottleneck and a source of bugs in applications. We summarize advanced patterns and precautions you must know when writing Go DB code for production.
Connection Pool Tuning
Key Configuration Formula
func setupProductionDB(dsn string) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
// Empirical Formula:
// MaxOpenConns = Number of CPU cores * 2 to 4 (General OLTP)
// = runtime.NumCPU() * 4
db.SetMaxOpenConns(runtime.NumCPU() * 4)
// MaxIdleConns = 50~100% of MaxOpenConns
db.SetMaxIdleConns(runtime.NumCPU() * 4)
// ConnMaxLifetime: Set shorter than DB server wait_timeout
// MySQL wait_timeout default 8 hours → set to 5 minutes
db.SetConnMaxLifetime(5 * time.Minute)
// Do not keep idle connections too long
db.SetConnMaxIdleTime(2 * time.Minute)
return db, db.Ping()
}
Monitoring Connection Pool Status
func monitorDBPool(db *sql.DB) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
stats := db.Stats()
slog.Info("DB Pool Status",
"open_connections", stats.OpenConnections,
"in_use", stats.InUse,
"idle", stats.Idle,
"wait_count", stats.WaitCount,
"wait_duration_ms", stats.WaitDuration.Milliseconds(),
"max_idle_closed", stats.MaxIdleClosed,
"max_lifetime_closed", stats.MaxLifetimeClosed,
)
// Consider increasing MaxOpenConns if WaitCount consistently increases
if stats.WaitCount > 100 {
slog.Warn("Excessive DB connection wait - consider adjusting pool size")
}
}
}
Solving N+1 Query Problems
N+1 Problem Pattern
// ❌ N+1 Query Problem: query users 1 time + query orders N times
func getUsersWithOrdersNPlusOne(ctx context.Context, db *sql.DB) {
users, _ := getAllUsers(ctx, db) // Query 1 time
for _, user := range users {
orders, _ := getOrdersByUserID(ctx, db, user.ID) // N additional queries!
fmt.Printf("%s: %d orders\n", user.Name, len(orders))
}
}
// ✅ Solution 1: Use JOIN query
func getUsersWithOrdersJoin(ctx context.Context, db *sql.DB) {
query := `
SELECT u.id, u.name, o.id AS order_id, o.amount
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
ORDER BY u.id
`
rows, _ := db.QueryContext(ctx, query)
defer rows.Close()
type UserOrder struct {
UserID int
Name string
OrderID sql.NullInt64
Amount sql.NullFloat64
}
var results []UserOrder
for rows.Next() {
var uo UserOrder
rows.Scan(&uo.UserID, &uo.Name, &uo.OrderID, &uo.Amount)
results = append(results, uo)
}
}
// ✅ Solution 2: Batch query with IN clause (commonly used in sqlc)
func getUserOrdersBatch(ctx context.Context, db *sql.DB, userIDs []int) map[int][]Order {
if len(userIDs) == 0 {
return nil
}
// Use IN clause with pq.Array (PostgreSQL)
query := `SELECT user_id, id, amount FROM orders WHERE user_id = ANY($1)`
rows, _ := db.QueryContext(ctx, query, pq.Array(userIDs))
defer rows.Close()
result := make(map[int][]Order)
for rows.Next() {
var o Order
rows.Scan(&o.UserID, &o.ID, &o.Amount)
result[o.UserID] = append(result[o.UserID], o)
}
return result
}
DB Layer Testing Strategy
Testing with Real DB using testcontainers
package db_test
import (
"context"
"database/sql"
"testing"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)
func setupTestDB(t *testing.T) *sql.DB {
t.Helper()
ctx := context.Background()
container, err := postgres.Run(ctx,
"docker.io/postgres:16-alpine",
postgres.WithDatabase("testdb"),
postgres.WithUsername("testuser"),
postgres.WithPassword("testpass"),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(2),
),
)
if err != nil {
t.Fatalf("failed to start PostgreSQL container: %v", err)
}
t.Cleanup(func() { container.Terminate(ctx) })
dsn, _ := container.ConnectionString(ctx, "sslmode=disable")
db, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatalf("failed to connect to DB: %v", err)
}
t.Cleanup(func() { db.Close() })
// Apply migrations
runMigrations(t, db)
return db
}
func TestUserRepository_Create(t *testing.T) {
db := setupTestDB(t)
repo := NewUserRepository(db)
ctx := context.Background()
user, err := repo.Create(ctx, "Test User", "test@example.com", 25)
if err != nil {
t.Fatalf("failed to create user: %v", err)
}
if user.ID == 0 {
t.Error("ID should not be 0")
}
if user.Name != "Test User" {
t.Errorf("name mismatch: want=%s, got=%s", "Test User", user.Name)
}
}
Test Isolation with Transaction Rollback
func TestWithTransaction(t *testing.T) {
db := setupTestDB(t)
ctx := context.Background()
// Start transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
t.Fatal(err)
}
// Always rollback after test (isolation without DB cleanup)
defer tx.Rollback()
// Test within transaction
repo := NewUserRepository(tx) // *sql.Tx is also queryable
user, _ := repo.Create(ctx, "Temp User", "temp@example.com", 20)
found, _ := repo.FindByID(ctx, user.ID)
if found == nil {
t.Error("could not find created user")
}
// defer tx.Rollback() automatically cleans up test data
}
Advanced Error Handling Patterns
Handling PostgreSQL Error Codes
import "github.com/lib/pq"
func createUserSafe(ctx context.Context, db *sql.DB, email string) error {
_, err := db.ExecContext(ctx, `INSERT INTO users (email) VALUES ($1)`, email)
if err != nil {
if pqErr, ok := err.(*pq.Error); ok {
switch pqErr.Code {
case "23505": // unique_violation
return fmt.Errorf("email '%s' is already in use", email)
case "23503": // foreign_key_violation
return fmt.Errorf("referential integrity violation: %v", pqErr.Detail)
case "23514": // check_violation
return fmt.Errorf("constraint violation: %v", pqErr.Constraint)
}
}
return fmt.Errorf("failed to create user: %w", err)
}
return nil
}
Applying Context Timeout
func getUserWithTimeout(db *sql.DB, id int) (*User, error) {
// 2-second query timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
var user User
err := db.QueryRowContext(ctx, `SELECT id, name FROM users WHERE id = $1`, id).
Scan(&user.ID, &user.Name)
if err != nil {
if ctx.Err() == context.DeadlineExceeded {
return nil, fmt.Errorf("query timed out (exceeded 2s)")
}
return nil, err
}
return &user, nil
}
GORM Performance Advanced Tips
// ✅ Use Select to query specific columns only (avoid SELECT *)
db.Select("id", "name", "email").Find(&users)
// ✅ Use Rows() for large queries
rows, _ := db.Model(&User{}).Where("active = ?", true).Rows()
defer rows.Close()
for rows.Next() {
var user User
db.ScanRows(rows, &user)
processUser(user) // Streaming without full loading in memory
}
// ✅ Index hints (if necessary)
db.Raw("SELECT /*+ INDEX(users idx_users_email) */ * FROM users WHERE email = ?", email).Scan(&user)
// ✅ Upsert (Insert or Update)
db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "email"}},
DoUpdates: clause.AssignmentColumns([]string{"name", "updated_at"}),
}).Create(&user)
Real-world DB Architecture Patterns
Read/Write Splitting
type DBCluster struct {
primary *sql.DB // For writing
replica *sql.DB // For reading
}
func (c *DBCluster) Write() *sql.DB { return c.primary }
func (c *DBCluster) Read() *sql.DB { return c.replica }
type UserRepository struct {
db *DBCluster
}
func (r *UserRepository) Create(ctx context.Context, user *User) error {
// Write via primary
return r.db.Write().QueryRowContext(ctx, ...).Scan(...)
}
func (r *UserRepository) FindAll(ctx context.Context) ([]User, error) {
// Read via replica
return queryUsers(ctx, r.db.Read(), ...)
}
Defining Repository Interface (Ease of Testing)
type UserRepository interface {
Create(ctx context.Context, name, email string) (*User, error)
FindByID(ctx context.Context, id int) (*User, error)
FindAll(ctx context.Context) ([]User, error)
Update(ctx context.Context, id int, name string) error
Delete(ctx context.Context, id int) error
}
// Actual implementation
type postgresUserRepo struct{ db *sql.DB }
// Mock implementation for testing
type mockUserRepo struct {
users map[int]*User
}
Quick Reference Checklist
Production DB Code Checklist:
- Pass
Contextto all queries (QueryContext,ExecContext) - Call
rows.Close()with defer - Check
rows.Err()after the loop ends - Configure connection pool (
SetMaxOpenConns,SetMaxIdleConns,SetConnMaxLifetime) - Prevent SQL Injection with parameter binding (strictly no string formatting)
- Transactions:
defer tx.Rollback()+tx.Commit()at the end - Wrap DB errors as business errors before returning (
%wwrapping) - Use streaming for large queries (avoid loading everything into memory)
- Write migrations with rollback (down) scripts