Worker Pools in Go: Design Patterns and Production Pitfalls
Worker pools are one of the most common concurrency patterns in Go. Spawn N goroutines, feed them work through a channel, collect results. Simple in concept — surprisingly tricky in production.
The Basic Pool
type Pool struct {
workers int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func NewPool(workers, queueSize int) *Pool {
return &Pool{
workers: workers,
jobs: make(chan Job, queueSize),
results: make(chan Result, queueSize),
}
}
func (p *Pool) Start(ctx context.Context) {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(ctx, i)
}
}
func (p *Pool) worker(ctx context.Context, id int) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-p.jobs:
if !ok {
return
}
result := job.Execute(ctx)
select {
case p.results <- result:
case <-ctx.Done():
return
}
}
}
}
func (p *Pool) Submit(job Job) {
p.jobs <- job
}
func (p *Pool) Shutdown() {
close(p.jobs)
p.wg.Wait()
close(p.results)
}
This works for simple cases. But production needs more.
Pitfall 1: Blocking Submits
If the job channel is full, Submit blocks the caller. This can cascade — if the caller is an HTTP handler, the request hangs.
func (p *Pool) Submit(ctx context.Context, job Job) error {
select {
case p.jobs <- job:
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
return ErrPoolOverloaded
}
}
Return an error instead of blocking forever. Let the caller decide what to do — retry, queue elsewhere, or return 503.
Pitfall 2: Panic Recovery
One panicking goroutine shouldn’t kill the entire pool:
func (p *Pool) worker(ctx context.Context, id int) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-p.jobs:
if !ok {
return
}
p.safeExecute(ctx, id, job)
}
}
}
func (p *Pool) safeExecute(ctx context.Context, workerID int, job Job) {
defer func() {
if r := recover(); r != nil {
slog.Error("worker panic recovered",
"worker_id", workerID,
"panic", r,
"stack", string(debug.Stack()),
)
panicCounter.Inc()
}
}()
result := job.Execute(ctx)
p.results <- result
}
Pitfall 3: No Visibility
You can’t fix what you can’t see:
type PoolMetrics struct {
activeWorkers atomic.Int64
jobsProcessed atomic.Int64
jobsFailed atomic.Int64
queueDepth func() int
}
func (p *Pool) worker(ctx context.Context, id int) {
defer p.wg.Done()
for job := range p.jobs {
p.metrics.activeWorkers.Add(1)
start := time.Now()
err := p.safeExecute(ctx, id, job)
p.metrics.activeWorkers.Add(-1)
jobDuration.Observe(time.Since(start).Seconds())
if err != nil {
p.metrics.jobsFailed.Add(1)
} else {
p.metrics.jobsProcessed.Add(1)
}
}
}
Dynamic Scaling
Sometimes you need more workers during peak and fewer during quiet periods:
type DynamicPool struct {
minWorkers int
maxWorkers int
jobs chan Job
active atomic.Int64
cancel context.CancelFunc
mu sync.Mutex
workers int
}
func (p *DynamicPool) scale(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
queueLen := len(p.jobs)
active := int(p.active.Load())
p.mu.Lock()
if queueLen > p.workers && p.workers < p.maxWorkers {
// Scale up
newWorkers := min(p.workers+2, p.maxWorkers)
for i := p.workers; i < newWorkers; i++ {
go p.worker(ctx, i)
}
slog.Info("scaled up", "workers", newWorkers)
p.workers = newWorkers
} else if queueLen == 0 && active < p.workers/2 && p.workers > p.minWorkers {
// Scale down happens naturally as excess workers find no jobs
slog.Info("workers idle, will scale down naturally")
}
p.mu.Unlock()
}
}
}
The Production-Ready Pool
Putting it all together:
type ProductionPool struct {
workers int
jobs chan Job
wg sync.WaitGroup
metrics *PoolMetrics
logger *slog.Logger
}
func (p *ProductionPool) Run(ctx context.Context) error {
p.logger.Info("starting worker pool", "workers", p.workers)
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func(id int) {
defer p.wg.Done()
p.logger.Info("worker started", "id", id)
for {
select {
case <-ctx.Done():
p.logger.Info("worker stopping", "id", id)
return
case job, ok := <-p.jobs:
if !ok {
return
}
p.metrics.activeWorkers.Add(1)
start := time.Now()
func() {
defer func() {
if r := recover(); r != nil {
p.logger.Error("panic", "worker", id, "panic", r)
}
p.metrics.activeWorkers.Add(-1)
}()
jobCtx, cancel := context.WithTimeout(ctx, job.Timeout)
defer cancel()
if err := job.Execute(jobCtx); err != nil {
p.metrics.jobsFailed.Add(1)
p.logger.Error("job failed", "id", id, "error", err)
} else {
p.metrics.jobsProcessed.Add(1)
}
}()
jobDuration.Observe(time.Since(start).Seconds())
}
}
}(i)
}
<-ctx.Done()
close(p.jobs) // Signal workers to drain and stop
p.wg.Wait()
p.logger.Info("all workers stopped")
return nil
}
Worker pools are deceptively simple. The difference between a toy implementation and a production one is error recovery, visibility, backpressure handling, and graceful shutdown.