← back to posts

Advanced Concurrency Patterns Every Go Engineer Should Know

·

Go’s concurrency primitives are simple. The patterns built on them are where the real power lives. These are the patterns I reach for regularly in production code.

Fan-Out, Fan-In

Distribute work across multiple goroutines, then collect results:

func processItems(ctx context.Context, items []Item) ([]Result, error) {
    numWorkers := min(len(items), runtime.NumCPU())

    jobs := make(chan Item, len(items))
    results := make(chan Result, len(items))

    // Fan-out: start workers
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for item := range jobs {
                result, err := processItem(ctx, item)
                if err != nil {
                    slog.Error("process item", "id", item.ID, "error", err)
                    continue
                }
                results <- result
            }
        }()
    }

    // Send jobs
    for _, item := range items {
        jobs <- item
    }
    close(jobs)

    // Fan-in: collect results
    go func() {
        wg.Wait()
        close(results)
    }()

    var collected []Result
    for result := range results {
        collected = append(collected, result)
    }
    return collected, nil
}

Pipeline Pattern

Chain processing stages connected by channels:

func pipeline(ctx context.Context, input <-chan RawData) <-chan ProcessedData {
    validated := validate(ctx, input)
    enriched := enrich(ctx, validated)
    transformed := transform(ctx, enriched)
    return transformed
}

func validate(ctx context.Context, in <-chan RawData) <-chan ValidData {
    out := make(chan ValidData)
    go func() {
        defer close(out)
        for data := range in {
            if data.IsValid() {
                select {
                case out <- ValidData(data):
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

Each stage is independently concurrent. Channels provide backpressure automatically.

Singleflight: Deduplicate Concurrent Calls

When 100 goroutines request the same cache key simultaneously, only one should hit the database:

import "golang.org/x/sync/singleflight"

type CachedService struct {
    db    *Database
    cache *Cache
    group singleflight.Group
}

func (s *CachedService) GetUser(ctx context.Context, id string) (*User, error) {
    // Check cache first
    if user, err := s.cache.Get(ctx, id); err == nil {
        return user, nil
    }

    // Deduplicate concurrent DB calls for the same ID
    result, err, _ := s.group.Do(id, func() (interface{}, error) {
        user, err := s.db.GetUser(ctx, id)
        if err != nil {
            return nil, err
        }
        s.cache.Set(ctx, id, user, 5*time.Minute)
        return user, nil
    })

    if err != nil {
        return nil, err
    }
    return result.(*User), nil
}

100 concurrent requests for the same user result in 1 database query. The other 99 wait and get the same result. This prevents thundering herd on cache misses.

errgroup: Structured Concurrency

The standard library for concurrent operations with error handling:

import "golang.org/x/sync/errgroup"

func fetchDashboardData(ctx context.Context, userID string) (*Dashboard, error) {
    g, ctx := errgroup.WithContext(ctx)

    var orders []Order
    var stats *Stats
    var notifications []Notification

    g.Go(func() error {
        var err error
        orders, err = orderService.GetRecent(ctx, userID)
        return err
    })

    g.Go(func() error {
        var err error
        stats, err = statsService.Get(ctx, userID)
        return err
    })

    g.Go(func() error {
        var err error
        notifications, err = notifService.GetUnread(ctx, userID)
        return err
    })

    if err := g.Wait(); err != nil {
        return nil, err // First error cancels all goroutines via ctx
    }

    return &Dashboard{Orders: orders, Stats: stats, Notifications: notifications}, nil
}

If any goroutine fails, the context cancels, and all other goroutines receive the cancellation signal.

Semaphore: Bounded Concurrency

Limit how many goroutines run simultaneously:

import "golang.org/x/sync/semaphore"

var sem = semaphore.NewWeighted(10) // Max 10 concurrent operations

func processWithLimit(ctx context.Context, items []Item) error {
    g, ctx := errgroup.WithContext(ctx)

    for _, item := range items {
        item := item
        g.Go(func() error {
            if err := sem.Acquire(ctx, 1); err != nil {
                return err
            }
            defer sem.Release(1)

            return process(ctx, item)
        })
    }

    return g.Wait()
}

Use this when the downstream system can’t handle unlimited concurrency (database connection limits, API rate limits).

Or-Done Channel

Read from a channel but respect context cancellation:

func orDone(ctx context.Context, ch <-chan Value) <-chan Value {
    out := make(chan Value)
    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                return
            case v, ok := <-ch:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out
}

This prevents goroutines from blocking forever on channel reads when the context is cancelled.

Tee Channel

Send values from one channel to two consumers:

func tee(ctx context.Context, in <-chan Value) (<-chan Value, <-chan Value) {
    out1 := make(chan Value)
    out2 := make(chan Value)

    go func() {
        defer close(out1)
        defer close(out2)

        for val := range orDone(ctx, in) {
            o1, o2 := out1, out2
            for i := 0; i < 2; i++ {
                select {
                case o1 <- val:
                    o1 = nil
                case o2 <- val:
                    o2 = nil
                case <-ctx.Done():
                    return
                }
            }
        }
    }()

    return out1, out2
}

Useful when the same stream of data needs to go to two independent processors.

When to Use What

PatternUse When
Fan-out/Fan-inProcessing a batch of independent items
PipelineMulti-stage data transformation
SingleflightDeduplicating concurrent identical requests
errgroupFetching data from multiple sources concurrently
SemaphoreLimiting concurrent access to a resource
Or-DoneReading from channels with cancellation

These patterns compose well. A real system might use errgroup to fan out to multiple services, singleflight to deduplicate cache misses within each call, and semaphores to limit database connections. Know the patterns, and you’ll combine them naturally.