Building Resilient Workers in Go for Queue-Based Architectures
Queue-based workers are the workhorses of backend systems. They process payments, send emails, generate reports, sync data. When they break, things pile up silently until someone notices a million unprocessed messages.
Here’s how I build workers that don’t break.
The Worker Skeleton
Every worker I build follows this structure:
type Worker struct {
consumer Consumer
handler Handler
concurrency int
logger *slog.Logger
}
func (w *Worker) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < w.concurrency; i++ {
id := i
g.Go(func() error {
return w.loop(ctx, id)
})
}
return g.Wait()
}
func (w *Worker) loop(ctx context.Context, id int) error {
w.logger.Info("worker started", "worker_id", id)
defer w.logger.Info("worker stopped", "worker_id", id)
for {
select {
case <-ctx.Done():
return nil
default:
}
msg, err := w.consumer.Receive(ctx)
if err != nil {
if ctx.Err() != nil {
return nil // Shutting down
}
w.logger.Error("receive failed", "error", err)
time.Sleep(time.Second)
continue
}
w.process(ctx, msg)
}
}
Graceful Shutdown
The most critical pattern. When a deploy happens, SIGTERM arrives. You must:
- Stop accepting new messages
- Finish processing in-flight messages
- Exit cleanly
func main() {
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer cancel()
worker := NewWorker(config)
slog.Info("starting worker")
if err := worker.Run(ctx); err != nil {
slog.Error("worker failed", "error", err)
os.Exit(1)
}
slog.Info("worker shutdown complete")
}
When ctx cancels, consumer.Receive returns, and each goroutine’s loop exits after finishing its current message. The errgroup.Wait() blocks until all goroutines finish.
Add a hard deadline for stubborn messages:
func main() {
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer cancel()
done := make(chan struct{})
go func() {
worker.Run(ctx)
close(done)
}()
<-ctx.Done()
slog.Info("shutdown signal received")
select {
case <-done:
slog.Info("clean shutdown")
case <-time.After(30 * time.Second):
slog.Error("forced shutdown after 30s timeout")
os.Exit(1)
}
}
Poison Message Handling
A poison message is one that always causes the handler to fail. Without detection, it loops forever: dequeue → fail → requeue → dequeue → fail.
func (w *Worker) process(ctx context.Context, msg Message) {
deliveryCount := msg.DeliveryCount()
if deliveryCount > 5 {
w.logger.Error("poison message detected",
"message_id", msg.ID(),
"delivery_count", deliveryCount,
)
w.deadLetter(ctx, msg, "max delivery count exceeded")
msg.Ack()
return
}
if err := w.handler(ctx, msg.Body()); err != nil {
w.logger.Error("handler failed",
"message_id", msg.ID(),
"delivery_count", deliveryCount,
"error", err,
)
msg.Nack()
return
}
msg.Ack()
}
Most message brokers track delivery count. Use it.
Backpressure
When your handler is slow (maybe the database is under load), you need to slow down consumption rather than buffering messages in memory:
type RateLimitedWorker struct {
worker *Worker
limiter *rate.Limiter
}
func (w *RateLimitedWorker) loop(ctx context.Context, id int) error {
for {
select {
case <-ctx.Done():
return nil
default:
}
// Wait for rate limit token
if err := w.limiter.Wait(ctx); err != nil {
return nil
}
msg, err := w.worker.consumer.Receive(ctx)
if err != nil {
time.Sleep(time.Second)
continue
}
w.worker.process(ctx, msg)
}
}
This caps throughput at a sustainable rate. If processing gets slower, messages stay in the queue where they’re safe, rather than piling up in memory.
Health Checks
Workers need health checks too, not just HTTP services:
type WorkerHealth struct {
mu sync.Mutex
lastProcessed time.Time
processing int32
}
func (h *WorkerHealth) RecordProcessed() {
h.mu.Lock()
h.lastProcessed = time.Now()
h.mu.Unlock()
}
func (h *WorkerHealth) IsHealthy() bool {
h.mu.Lock()
defer h.mu.Unlock()
// Unhealthy if no messages processed in 5 minutes
// and we're supposed to be consuming
if time.Since(h.lastProcessed) > 5*time.Minute {
return false
}
return true
}
Expose this via a simple HTTP endpoint so Kubernetes liveness probes can detect stuck workers:
go func() {
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if health.IsHealthy() {
w.WriteHeader(200)
} else {
w.WriteHeader(503)
}
})
http.ListenAndServe(":8081", nil)
}()
Metrics
The metrics that tell you everything:
- Messages processed (counter, by status: success/failure)
- Processing duration (histogram)
- Queue depth (gauge — most brokers expose this)
- Consumer lag (gauge — time between message publish and processing)
- In-flight messages (gauge — currently being processed)
- DLQ size (gauge)
Alert on queue depth growing, consumer lag increasing, or DLQ size non-zero.
Testing Workers
Use an in-memory queue for unit tests:
func TestWorkerProcessesMessages(t *testing.T) {
queue := NewInMemoryQueue()
var processed []string
worker := NewWorker(WorkerConfig{
Consumer: queue,
Handler: func(ctx context.Context, body []byte) error {
processed = append(processed, string(body))
return nil
},
Concurrency: 1,
})
queue.Publish("message-1")
queue.Publish("message-2")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
worker.Run(ctx)
assert.Equal(t, []string{"message-1", "message-2"}, processed)
}
Resilient workers aren’t complicated. They’re disciplined: handle shutdown, handle poison messages, handle overload, and make everything observable.