Advanced Concurrency Patterns Every Go Engineer Should Know
Go’s concurrency primitives are simple. The patterns built on them are where the real power lives. These are the patterns I reach for regularly in production code.
Fan-Out, Fan-In
Distribute work across multiple goroutines, then collect results:
func processItems(ctx context.Context, items []Item) ([]Result, error) {
numWorkers := min(len(items), runtime.NumCPU())
jobs := make(chan Item, len(items))
results := make(chan Result, len(items))
// Fan-out: start workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range jobs {
result, err := processItem(ctx, item)
if err != nil {
slog.Error("process item", "id", item.ID, "error", err)
continue
}
results <- result
}
}()
}
// Send jobs
for _, item := range items {
jobs <- item
}
close(jobs)
// Fan-in: collect results
go func() {
wg.Wait()
close(results)
}()
var collected []Result
for result := range results {
collected = append(collected, result)
}
return collected, nil
}
Pipeline Pattern
Chain processing stages connected by channels:
func pipeline(ctx context.Context, input <-chan RawData) <-chan ProcessedData {
validated := validate(ctx, input)
enriched := enrich(ctx, validated)
transformed := transform(ctx, enriched)
return transformed
}
func validate(ctx context.Context, in <-chan RawData) <-chan ValidData {
out := make(chan ValidData)
go func() {
defer close(out)
for data := range in {
if data.IsValid() {
select {
case out <- ValidData(data):
case <-ctx.Done():
return
}
}
}
}()
return out
}
Each stage is independently concurrent. Channels provide backpressure automatically.
Singleflight: Deduplicate Concurrent Calls
When 100 goroutines request the same cache key simultaneously, only one should hit the database:
import "golang.org/x/sync/singleflight"
type CachedService struct {
db *Database
cache *Cache
group singleflight.Group
}
func (s *CachedService) GetUser(ctx context.Context, id string) (*User, error) {
// Check cache first
if user, err := s.cache.Get(ctx, id); err == nil {
return user, nil
}
// Deduplicate concurrent DB calls for the same ID
result, err, _ := s.group.Do(id, func() (interface{}, error) {
user, err := s.db.GetUser(ctx, id)
if err != nil {
return nil, err
}
s.cache.Set(ctx, id, user, 5*time.Minute)
return user, nil
})
if err != nil {
return nil, err
}
return result.(*User), nil
}
100 concurrent requests for the same user result in 1 database query. The other 99 wait and get the same result. This prevents thundering herd on cache misses.
errgroup: Structured Concurrency
The standard library for concurrent operations with error handling:
import "golang.org/x/sync/errgroup"
func fetchDashboardData(ctx context.Context, userID string) (*Dashboard, error) {
g, ctx := errgroup.WithContext(ctx)
var orders []Order
var stats *Stats
var notifications []Notification
g.Go(func() error {
var err error
orders, err = orderService.GetRecent(ctx, userID)
return err
})
g.Go(func() error {
var err error
stats, err = statsService.Get(ctx, userID)
return err
})
g.Go(func() error {
var err error
notifications, err = notifService.GetUnread(ctx, userID)
return err
})
if err := g.Wait(); err != nil {
return nil, err // First error cancels all goroutines via ctx
}
return &Dashboard{Orders: orders, Stats: stats, Notifications: notifications}, nil
}
If any goroutine fails, the context cancels, and all other goroutines receive the cancellation signal.
Semaphore: Bounded Concurrency
Limit how many goroutines run simultaneously:
import "golang.org/x/sync/semaphore"
var sem = semaphore.NewWeighted(10) // Max 10 concurrent operations
func processWithLimit(ctx context.Context, items []Item) error {
g, ctx := errgroup.WithContext(ctx)
for _, item := range items {
item := item
g.Go(func() error {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)
return process(ctx, item)
})
}
return g.Wait()
}
Use this when the downstream system can’t handle unlimited concurrency (database connection limits, API rate limits).
Or-Done Channel
Read from a channel but respect context cancellation:
func orDone(ctx context.Context, ch <-chan Value) <-chan Value {
out := make(chan Value)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case v, ok := <-ch:
if !ok {
return
}
select {
case out <- v:
case <-ctx.Done():
return
}
}
}
}()
return out
}
This prevents goroutines from blocking forever on channel reads when the context is cancelled.
Tee Channel
Send values from one channel to two consumers:
func tee(ctx context.Context, in <-chan Value) (<-chan Value, <-chan Value) {
out1 := make(chan Value)
out2 := make(chan Value)
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(ctx, in) {
o1, o2 := out1, out2
for i := 0; i < 2; i++ {
select {
case o1 <- val:
o1 = nil
case o2 <- val:
o2 = nil
case <-ctx.Done():
return
}
}
}
}()
return out1, out2
}
Useful when the same stream of data needs to go to two independent processors.
When to Use What
| Pattern | Use When |
|---|---|
| Fan-out/Fan-in | Processing a batch of independent items |
| Pipeline | Multi-stage data transformation |
| Singleflight | Deduplicating concurrent identical requests |
| errgroup | Fetching data from multiple sources concurrently |
| Semaphore | Limiting concurrent access to a resource |
| Or-Done | Reading from channels with cancellation |
These patterns compose well. A real system might use errgroup to fan out to multiple services, singleflight to deduplicate cache misses within each call, and semaphores to limit database connections. Know the patterns, and you’ll combine them naturally.