Pro Tips — Concurrency Patterns and Pitfalls
Data Race Detection — Race Detector
Go can detect runtime data races with the -race flag.
# Enable race detector in build
go build -race ./...
# Race detector in tests
go test -race ./...
# Run with race detector
go run -race main.go
package main
import (
"fmt"
"sync"
)
// ❌ Data race — detected with go run -race
func withRace() {
count := 0
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
count++ // DATA RACE!
}()
}
wg.Wait()
fmt.Println(count)
}
// ✅ Fixed with atomic package
func withAtomic() {
var count int64
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// atomic.AddInt64(&count, 1)
_ = count
}()
}
wg.Wait()
}
func main() {
withRace()
}
Race detector tips:
- Always include
go test -race ./...in CI/CD pipelines - Run
-racebuilds in staging environments even for production servers - Detection is accurate but not exhaustive (depends on execution path)
Channel vs Mutex Decision Guide
Choose channels when:
- Passing data from one goroutine to another
- Distributing work / collecting results
- Building async pipelines
- Timeout or cancellation needed
Choose Mutex when:
- Protecting shared state (caches, counters)
- Critical sections are short and simple
- Protecting struct internal state
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// Simple counter: atomic package is most appropriate
type AtomicCounter struct {
count int64
}
func (c *AtomicCounter) Inc() {
atomic.AddInt64(&c.count, 1)
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.count)
}
// Complex state: Mutex
type Stats struct {
mu sync.Mutex
hits int
misses int
latency []float64
}
func (s *Stats) RecordHit(latency float64) {
s.mu.Lock()
defer s.mu.Unlock()
s.hits++
s.latency = append(s.latency, latency)
}
// Work distribution: channels
func distribute(tasks []int, workers int) []int {
ch := make(chan int, len(tasks))
results := make(chan int, len(tasks))
for i := 0; i < workers; i++ {
go func() {
for t := range ch {
results <- t * t
}
}()
}
for _, t := range tasks {
ch <- t
}
close(ch)
out := make([]int, 0, len(tasks))
for range tasks {
out = append(out, <-results)
}
return out
}
func main() {
c := &AtomicCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c.Inc()
}()
}
wg.Wait()
fmt.Println("Count:", c.Value())
}
Worker Pool Pattern
Limit goroutine count to protect system resources.
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Err error
}
func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
fmt.Printf("Worker %d: jobs channel closed, exiting\n", id)
return
}
time.Sleep(100 * time.Millisecond)
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("[Worker %d] processed: %s", id, job.Data),
}
case <-ctx.Done():
fmt.Printf("Worker %d: context cancelled, exiting\n", id)
return
}
}
}
func runWorkerPool(ctx context.Context, numWorkers int, jobList []Job) []Result {
jobs := make(chan Job, len(jobList))
results := make(chan Result, len(jobList))
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(ctx, i+1, jobs, results, &wg)
}
for _, job := range jobList {
jobs <- job
}
close(jobs)
go func() {
wg.Wait()
close(results)
}()
var out []Result
for r := range results {
out = append(out, r)
}
return out
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
jobs := make([]Job, 10)
for i := range jobs {
jobs[i] = Job{ID: i + 1, Data: fmt.Sprintf("task-%d", i+1)}
}
results := runWorkerPool(ctx, 3, jobs)
fmt.Printf("Completed jobs: %d\n", len(results))
for _, r := range results {
fmt.Printf(" [%d] %s\n", r.JobID, r.Output)
}
}
Semaphore Pattern — Limiting Concurrency
Use a buffered channel as a semaphore to limit concurrent execution.
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Semaphore chan struct{}
func NewSemaphore(n int) Semaphore {
return make(Semaphore, n)
}
func (s Semaphore) Acquire(ctx context.Context) error {
select {
case s <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (s Semaphore) Release() {
<-s
}
func main() {
sem := NewSemaphore(3) // Max 3 concurrent executions
ctx := context.Background()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
if err := sem.Acquire(ctx); err != nil {
fmt.Printf("Task %d: failed to acquire semaphore: %v\n", id, err)
return
}
defer sem.Release()
fmt.Printf("Task %d starting (concurrent: %d/3)\n", id, len(sem))
time.Sleep(200 * time.Millisecond)
fmt.Printf("Task %d done\n", id)
}(i)
}
wg.Wait()
fmt.Println("All tasks complete")
}
Goroutine Leak Prevention Checklist
package main
import (
"context"
"fmt"
"time"
)
// ✅ 1. Always provide an exit condition via context or done channel
func workerWithContext(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return // Clean exit
default:
time.Sleep(100 * time.Millisecond)
}
}
}()
}
// ✅ 2. Use buffered channels to prevent send blocking
func safeProducer(results chan<- string) {
select {
case results <- "result":
default:
fmt.Println("Results channel full, skipping")
}
}
// ✅ 3. Recover from panics before goroutine starts
func safeGoroutine(f func()) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Println("Goroutine panic recovered:", r)
}
}()
f()
}()
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
workerWithContext(ctx)
time.Sleep(1500 * time.Millisecond)
fmt.Println("Done")
}
sync.Pool — Object Reuse
Reuse frequently allocated/deallocated objects to reduce GC pressure.
package main
import (
"bytes"
"fmt"
"sync"
)
var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
}
func processRequest(data string) string {
// Get buffer from pool
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset() // Clear previous contents
defer bufferPool.Put(buf) // Return to pool after use
buf.WriteString("processed: ")
buf.WriteString(data)
return buf.String()
}
func main() {
var wg sync.WaitGroup
results := make([]string, 100)
for i := 0; i < 100; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
results[i] = processRequest(fmt.Sprintf("request-%d", i))
}()
}
wg.Wait()
fmt.Println("First 5 results:", results[:5])
}
sync.Pool notes:
- Pool may be cleared during GC — use for caching, not permanent storage
- Always call Put after Get
- Reset stateful objects before returning to pool
Concurrency Pattern Summary
| Pattern | When to Use | Tools |
|---|---|---|
| Worker pool | Limit job count | Channel + WaitGroup |
| Pipeline | Stage-by-stage processing | Channel chain |
| Fan-out/Fan-in | Parallel processing + result collection | Channel + WaitGroup |
| Semaphore | Limit concurrent executions | Buffered channel |
| Rate limiter | Control request rate | time.Ticker + channel |
| Object pool | Memory reuse | sync.Pool |