본문으로 건너뛰기

실전 고수 팁 — 동시성 패턴과 함정

데이터 레이스 감지 — Race Detector

Go는 -race 플래그로 런타임 데이터 레이스를 감지할 수 있습니다.

# 빌드 시 race detector 활성화
go build -race ./...

# 테스트 시 race detector
go test -race ./...

# 실행 시
go run -race main.go
package main

import (
"fmt"
"sync"
)

// ❌ 데이터 레이스 — go run -race로 감지됨
func withRace() {
count := 0
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
count++ // DATA RACE!
}()
}
wg.Wait()
fmt.Println(count)
}

// ✅ atomic 패키지로 해결
func withAtomic() {
var count int64
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// atomic.AddInt64(&count, 1)
_ = count
}()
}
wg.Wait()
}

func main() {
withRace()
}

race detector 활용 팁:

  • CI/CD 파이프라인에 go test -race ./... 반드시 포함
  • 프로덕션 서버에서도 -race 빌드를 스테이징 환경에서 실행
  • 감지 결과는 정확하지만 모든 레이스를 감지하지는 못함 (실행 경로에 의존)

채널 vs Mutex 선택 기준

채널을 선택할 때:
- 데이터를 한 고루틴에서 다른 고루틴으로 전달할 때
- 작업 분배 / 결과 수집
- 비동기 파이프라인 구성
- 타임아웃이나 취소가 필요할 때

Mutex를 선택할 때:
- 공유 상태(캐시, 카운터)를 보호할 때
- 임계 영역이 짧고 단순할 때
- 구조체 내부 상태 보호
package main

import (
"fmt"
"sync"
"sync/atomic"
)

// 단순 카운터: atomic 패키지가 가장 적합
type AtomicCounter struct {
count int64
}

func (c *AtomicCounter) Inc() {
atomic.AddInt64(&c.count, 1)
}

func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.count)
}

// 복잡한 상태: Mutex
type Stats struct {
mu sync.Mutex
hits int
misses int
latency []float64
}

func (s *Stats) RecordHit(latency float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.hits++
s.latency = append(s.latency, latency)
}

// 작업 분배: 채널
func distribute(tasks []int, workers int) []int {
ch := make(chan int, len(tasks))
results := make(chan int, len(tasks))

for i := 0; i < workers; i++ {
go func() {
for t := range ch {
results <- t * t
}
}()
}

for _, t := range tasks {
ch <- t
}
close(ch)

out := make([]int, 0, len(tasks))
for range tasks {
out = append(out, <-results)
}
return out
}

func main() {
c := &AtomicCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Inc()
}()
}
wg.Wait()
fmt.Println("카운트:", c.Value())
}

워커 풀 패턴

고루틴 수를 제한해서 시스템 리소스를 보호합니다.

package main

import (
"context"
"fmt"
"sync"
"time"
)

type Job struct {
ID int
Data string
}

type Result struct {
JobID int
Output string
Err error
}

func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
fmt.Printf("Worker %d: 작업 채널 닫힘, 종료\n", id)
return
}
// 실제 작업 처리
time.Sleep(100 * time.Millisecond)
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("[Worker %d] 처리: %s", id, job.Data),
}
case <-ctx.Done():
fmt.Printf("Worker %d: 컨텍스트 취소, 종료\n", id)
return
}
}
}

func runWorkerPool(ctx context.Context, numWorkers int, jobList []Job) []Result {
jobs := make(chan Job, len(jobList))
results := make(chan Result, len(jobList))

var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(ctx, i+1, jobs, results, &wg)
}

// 작업 전송
for _, job := range jobList {
jobs <- job
}
close(jobs)

// 모든 워커 종료 후 결과 채널 닫기
go func() {
wg.Wait()
close(results)
}()

// 결과 수집
var out []Result
for r := range results {
out = append(out, r)
}
return out
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

jobs := make([]Job, 10)
for i := range jobs {
jobs[i] = Job{ID: i + 1, Data: fmt.Sprintf("task-%d", i+1)}
}

results := runWorkerPool(ctx, 3, jobs)
fmt.Printf("완료된 작업 수: %d\n", len(results))
for _, r := range results {
fmt.Printf(" [%d] %s\n", r.JobID, r.Output)
}
}

세마포어 패턴 — 동시성 제한

버퍼드 채널로 세마포어를 구현해 동시 실행 수를 제한합니다.

package main

import (
"context"
"fmt"
"sync"
"time"
)

type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
return make(Semaphore, n)
}

func (s Semaphore) Acquire(ctx context.Context) error {
select {
case s <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (s Semaphore) Release() {
<-s
}

func main() {
sem := NewSemaphore(3) // 동시에 최대 3개만 실행
ctx := context.Background()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if err := sem.Acquire(ctx); err != nil {
fmt.Printf("Task %d: 세마포어 획득 실패: %v\n", id, err)
return
}
defer sem.Release()

fmt.Printf("Task %d 시작 (동시 실행: %d/3)\n", id, len(sem))
time.Sleep(200 * time.Millisecond)
fmt.Printf("Task %d 완료\n", id)
}(i)
}

wg.Wait()
fmt.Println("모든 작업 완료")
}

고루틴 누수 방지 체크리스트

package main

import (
"context"
"fmt"
"time"
)

// ✅ 1. 항상 context 또는 done 채널로 종료 조건 제공
func workerWithContext(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return // 정상 종료
default:
time.Sleep(100 * time.Millisecond)
}
}
}()
}

// ✅ 2. 버퍼드 채널로 전송 블로킹 방지
func safeProducer(results chan<- string) {
// 채널이 꽉 찼을 때를 대비한 처리
select {
case results <- "result":
default:
fmt.Println("결과 채널이 꽉 참, 건너뜀")
}
}

// ✅ 3. 고루틴 시작 전 패닉 방지
func safeGoroutine(f func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("고루틴 패닉 복구:", r)
}
}()
f()
}()
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

workerWithContext(ctx)
time.Sleep(1500 * time.Millisecond)
fmt.Println("완료")
}

sync.Pool — 객체 재사용

자주 할당/해제되는 객체를 재사용해서 GC 압박을 줄입니다.

package main

import (
"bytes"
"fmt"
"sync"
)

var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}

func processRequest(data string) string {
// 풀에서 버퍼 가져오기
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset() // 이전 내용 초기화
defer bufferPool.Put(buf) // 사용 후 반납

buf.WriteString("처리됨: ")
buf.WriteString(data)
return buf.String()
}

func main() {
var wg sync.WaitGroup
results := make([]string, 100)

for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
results[i] = processRequest(fmt.Sprintf("요청-%d", i))
}()
}

wg.Wait()
fmt.Println("첫 5개 결과:", results[:5])
}

sync.Pool 특징:

  • GC 시 풀이 비워질 수 있음 — 캐시 목적, 영구 저장 불가
  • Get 후 반드시 Put 호출
  • 상태가 있는 객체는 Reset 후 반납

동시성 패턴 요약

패턴사용 시점도구
워커 풀작업 수 제한채널 + WaitGroup
파이프라인단계적 데이터 처리채널 체인
Fan-out/Fan-in병렬 처리 + 결과 수집채널 + WaitGroup
세마포어동시 실행 수 제한버퍼드 채널
게이트웨이요청 속도 제한time.Ticker + 채널
객체 풀메모리 재사용sync.Pool