← back to posts

Building Resilient Workers in Go for Queue-Based Architectures

Queue-based workers are the workhorses of backend systems. They process payments, send emails, generate reports, sync data. When they break, things pile up silently until someone notices a million unprocessed messages.

Here’s how I build workers that don’t break.

The Worker Skeleton

Every worker I build follows this structure:

type Worker struct {
    consumer    Consumer
    handler     Handler
    concurrency int
    logger      *slog.Logger
}

func (w *Worker) Run(ctx context.Context) error {
    g, ctx := errgroup.WithContext(ctx)

    for i := 0; i < w.concurrency; i++ {
        id := i
        g.Go(func() error {
            return w.loop(ctx, id)
        })
    }

    return g.Wait()
}

func (w *Worker) loop(ctx context.Context, id int) error {
    w.logger.Info("worker started", "worker_id", id)
    defer w.logger.Info("worker stopped", "worker_id", id)

    for {
        select {
        case <-ctx.Done():
            return nil
        default:
        }

        msg, err := w.consumer.Receive(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return nil // Shutting down
            }
            w.logger.Error("receive failed", "error", err)
            time.Sleep(time.Second)
            continue
        }

        w.process(ctx, msg)
    }
}

Graceful Shutdown

The most critical pattern. When a deploy happens, SIGTERM arrives. You must:

  1. Stop accepting new messages
  2. Finish processing in-flight messages
  3. Exit cleanly
func main() {
    ctx, cancel := signal.NotifyContext(context.Background(),
        syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    worker := NewWorker(config)

    slog.Info("starting worker")
    if err := worker.Run(ctx); err != nil {
        slog.Error("worker failed", "error", err)
        os.Exit(1)
    }
    slog.Info("worker shutdown complete")
}

When ctx cancels, consumer.Receive returns, and each goroutine’s loop exits after finishing its current message. The errgroup.Wait() blocks until all goroutines finish.

Add a hard deadline for stubborn messages:

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(),
        syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    done := make(chan struct{})
    go func() {
        worker.Run(ctx)
        close(done)
    }()

    <-ctx.Done()
    slog.Info("shutdown signal received")

    select {
    case <-done:
        slog.Info("clean shutdown")
    case <-time.After(30 * time.Second):
        slog.Error("forced shutdown after 30s timeout")
        os.Exit(1)
    }
}

Poison Message Handling

A poison message is one that always causes the handler to fail. Without detection, it loops forever: dequeue → fail → requeue → dequeue → fail.

func (w *Worker) process(ctx context.Context, msg Message) {
    deliveryCount := msg.DeliveryCount()

    if deliveryCount > 5 {
        w.logger.Error("poison message detected",
            "message_id", msg.ID(),
            "delivery_count", deliveryCount,
        )
        w.deadLetter(ctx, msg, "max delivery count exceeded")
        msg.Ack()
        return
    }

    if err := w.handler(ctx, msg.Body()); err != nil {
        w.logger.Error("handler failed",
            "message_id", msg.ID(),
            "delivery_count", deliveryCount,
            "error", err,
        )
        msg.Nack()
        return
    }

    msg.Ack()
}

Most message brokers track delivery count. Use it.

Backpressure

When your handler is slow (maybe the database is under load), you need to slow down consumption rather than buffering messages in memory:

type RateLimitedWorker struct {
    worker  *Worker
    limiter *rate.Limiter
}

func (w *RateLimitedWorker) loop(ctx context.Context, id int) error {
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
        }

        // Wait for rate limit token
        if err := w.limiter.Wait(ctx); err != nil {
            return nil
        }

        msg, err := w.worker.consumer.Receive(ctx)
        if err != nil {
            time.Sleep(time.Second)
            continue
        }

        w.worker.process(ctx, msg)
    }
}

This caps throughput at a sustainable rate. If processing gets slower, messages stay in the queue where they’re safe, rather than piling up in memory.

Health Checks

Workers need health checks too, not just HTTP services:

type WorkerHealth struct {
    mu            sync.Mutex
    lastProcessed time.Time
    processing    int32
}

func (h *WorkerHealth) RecordProcessed() {
    h.mu.Lock()
    h.lastProcessed = time.Now()
    h.mu.Unlock()
}

func (h *WorkerHealth) IsHealthy() bool {
    h.mu.Lock()
    defer h.mu.Unlock()

    // Unhealthy if no messages processed in 5 minutes
    // and we're supposed to be consuming
    if time.Since(h.lastProcessed) > 5*time.Minute {
        return false
    }
    return true
}

Expose this via a simple HTTP endpoint so Kubernetes liveness probes can detect stuck workers:

go func() {
    http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
        if health.IsHealthy() {
            w.WriteHeader(200)
        } else {
            w.WriteHeader(503)
        }
    })
    http.ListenAndServe(":8081", nil)
}()

Metrics

The metrics that tell you everything:

  • Messages processed (counter, by status: success/failure)
  • Processing duration (histogram)
  • Queue depth (gauge — most brokers expose this)
  • Consumer lag (gauge — time between message publish and processing)
  • In-flight messages (gauge — currently being processed)
  • DLQ size (gauge)

Alert on queue depth growing, consumer lag increasing, or DLQ size non-zero.

Testing Workers

Use an in-memory queue for unit tests:

func TestWorkerProcessesMessages(t *testing.T) {
    queue := NewInMemoryQueue()
    var processed []string

    worker := NewWorker(WorkerConfig{
        Consumer:    queue,
        Handler: func(ctx context.Context, body []byte) error {
            processed = append(processed, string(body))
            return nil
        },
        Concurrency: 1,
    })

    queue.Publish("message-1")
    queue.Publish("message-2")

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    worker.Run(ctx)

    assert.Equal(t, []string{"message-1", "message-2"}, processed)
}

Resilient workers aren’t complicated. They’re disciplined: handle shutdown, handle poison messages, handle overload, and make everything observable.