Skip to main content

Pro Tips — Concurrency Patterns and Pitfalls

Data Race Detection — Race Detector

Go can detect runtime data races with the -race flag.

# Enable race detector in build
go build -race ./...

# Race detector in tests
go test -race ./...

# Run with race detector
go run -race main.go
package main

import (
"fmt"
"sync"
)

// ❌ Data race — detected with go run -race
func withRace() {
count := 0
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
count++ // DATA RACE!
}()
}
wg.Wait()
fmt.Println(count)
}

// ✅ Fixed with atomic package
func withAtomic() {
var count int64
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// atomic.AddInt64(&count, 1)
_ = count
}()
}
wg.Wait()
}

func main() {
withRace()
}

Race detector tips:

  • Always include go test -race ./... in CI/CD pipelines
  • Run -race builds in staging environments even for production servers
  • Detection is accurate but not exhaustive (depends on execution path)

Channel vs Mutex Decision Guide

Choose channels when:
- Passing data from one goroutine to another
- Distributing work / collecting results
- Building async pipelines
- Timeout or cancellation needed

Choose Mutex when:
- Protecting shared state (caches, counters)
- Critical sections are short and simple
- Protecting struct internal state
package main

import (
"fmt"
"sync"
"sync/atomic"
)

// Simple counter: atomic package is most appropriate
type AtomicCounter struct {
count int64
}

func (c *AtomicCounter) Inc() {
atomic.AddInt64(&c.count, 1)
}

func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.count)
}

// Complex state: Mutex
type Stats struct {
mu sync.Mutex
hits int
misses int
latency []float64
}

func (s *Stats) RecordHit(latency float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.hits++
s.latency = append(s.latency, latency)
}

// Work distribution: channels
func distribute(tasks []int, workers int) []int {
ch := make(chan int, len(tasks))
results := make(chan int, len(tasks))

for i := 0; i < workers; i++ {
go func() {
for t := range ch {
results <- t * t
}
}()
}

for _, t := range tasks {
ch <- t
}
close(ch)

out := make([]int, 0, len(tasks))
for range tasks {
out = append(out, <-results)
}
return out
}

func main() {
c := &AtomicCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Inc()
}()
}
wg.Wait()
fmt.Println("Count:", c.Value())
}

Worker Pool Pattern

Limit goroutine count to protect system resources.

package main

import (
"context"
"fmt"
"sync"
"time"
)

type Job struct {
ID int
Data string
}

type Result struct {
JobID int
Output string
Err error
}

func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
fmt.Printf("Worker %d: jobs channel closed, exiting\n", id)
return
}
time.Sleep(100 * time.Millisecond)
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("[Worker %d] processed: %s", id, job.Data),
}
case <-ctx.Done():
fmt.Printf("Worker %d: context cancelled, exiting\n", id)
return
}
}
}

func runWorkerPool(ctx context.Context, numWorkers int, jobList []Job) []Result {
jobs := make(chan Job, len(jobList))
results := make(chan Result, len(jobList))

var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(ctx, i+1, jobs, results, &wg)
}

for _, job := range jobList {
jobs <- job
}
close(jobs)

go func() {
wg.Wait()
close(results)
}()

var out []Result
for r := range results {
out = append(out, r)
}
return out
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

jobs := make([]Job, 10)
for i := range jobs {
jobs[i] = Job{ID: i + 1, Data: fmt.Sprintf("task-%d", i+1)}
}

results := runWorkerPool(ctx, 3, jobs)
fmt.Printf("Completed jobs: %d\n", len(results))
for _, r := range results {
fmt.Printf(" [%d] %s\n", r.JobID, r.Output)
}
}

Semaphore Pattern — Limiting Concurrency

Use a buffered channel as a semaphore to limit concurrent execution.

package main

import (
"context"
"fmt"
"sync"
"time"
)

type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
return make(Semaphore, n)
}

func (s Semaphore) Acquire(ctx context.Context) error {
select {
case s <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (s Semaphore) Release() {
<-s
}

func main() {
sem := NewSemaphore(3) // Max 3 concurrent executions
ctx := context.Background()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if err := sem.Acquire(ctx); err != nil {
fmt.Printf("Task %d: failed to acquire semaphore: %v\n", id, err)
return
}
defer sem.Release()

fmt.Printf("Task %d starting (concurrent: %d/3)\n", id, len(sem))
time.Sleep(200 * time.Millisecond)
fmt.Printf("Task %d done\n", id)
}(i)
}

wg.Wait()
fmt.Println("All tasks complete")
}

Goroutine Leak Prevention Checklist

package main

import (
"context"
"fmt"
"time"
)

// ✅ 1. Always provide an exit condition via context or done channel
func workerWithContext(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return // Clean exit
default:
time.Sleep(100 * time.Millisecond)
}
}
}()
}

// ✅ 2. Use buffered channels to prevent send blocking
func safeProducer(results chan<- string) {
select {
case results <- "result":
default:
fmt.Println("Results channel full, skipping")
}
}

// ✅ 3. Recover from panics before goroutine starts
func safeGoroutine(f func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Goroutine panic recovered:", r)
}
}()
f()
}()
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

workerWithContext(ctx)
time.Sleep(1500 * time.Millisecond)
fmt.Println("Done")
}

sync.Pool — Object Reuse

Reuse frequently allocated/deallocated objects to reduce GC pressure.

package main

import (
"bytes"
"fmt"
"sync"
)

var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}

func processRequest(data string) string {
// Get buffer from pool
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset() // Clear previous contents
defer bufferPool.Put(buf) // Return to pool after use

buf.WriteString("processed: ")
buf.WriteString(data)
return buf.String()
}

func main() {
var wg sync.WaitGroup
results := make([]string, 100)

for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
results[i] = processRequest(fmt.Sprintf("request-%d", i))
}()
}

wg.Wait()
fmt.Println("First 5 results:", results[:5])
}

sync.Pool notes:

  • Pool may be cleared during GC — use for caching, not permanent storage
  • Always call Put after Get
  • Reset stateful objects before returning to pool

Concurrency Pattern Summary

PatternWhen to UseTools
Worker poolLimit job countChannel + WaitGroup
PipelineStage-by-stage processingChannel chain
Fan-out/Fan-inParallel processing + result collectionChannel + WaitGroup
SemaphoreLimit concurrent executionsBuffered channel
Rate limiterControl request ratetime.Ticker + channel
Object poolMemory reusesync.Pool