← back to posts

Designing a Distributed Task Queue in Go from Scratch

Most task queue libraries are either too simple for production or too complex to debug. I built one from scratch in Go to understand the problem space deeply. Here’s the architecture and the decisions behind it.

Why Build Your Own?

I didn’t build this for production use — we use well-tested solutions for that. I built it to understand the internals: how do you distribute work fairly? How do you handle failures without losing tasks? How do you scale horizontally?

The answers are more nuanced than “just use Redis and BRPOPLPUSH.”

Core Architecture

The system has three components:

┌──────────┐     ┌───────────┐     ┌──────────┐
│ Producer  │────▶│   Broker   │────▶│  Worker  │
│  (enqueue)│     │  (Redis)   │     │ (dequeue)│
└──────────┘     └───────────┘     └──────────┘

                 ┌─────┴─────┐
                 │  Dead Letter│
                 │   Queue     │
                 └────────────┘

The broker is Redis. Producers enqueue tasks. Workers dequeue and process them. Failed tasks eventually land in a dead-letter queue.

Task Representation

type Task struct {
    ID        string            `json:"id"`
    Queue     string            `json:"queue"`
    Payload   json.RawMessage   `json:"payload"`
    Priority  int               `json:"priority"`
    MaxRetry  int               `json:"max_retry"`
    RetryCount int              `json:"retry_count"`
    CreatedAt time.Time         `json:"created_at"`
    ProcessAt time.Time         `json:"process_at"`
    Deadline  time.Time         `json:"deadline"`
}

Every task has a unique ID (for idempotency), a priority level, retry limits, and an optional scheduled time for delayed execution.

Enqueuing with Priority

Redis sorted sets give us priority queues for free. The score is a combination of priority and timestamp:

func (b *Broker) Enqueue(ctx context.Context, task *Task) error {
    data, err := json.Marshal(task)
    if err != nil {
        return fmt.Errorf("marshal task: %w", err)
    }

    score := float64(task.Priority)*1e13 + float64(task.ProcessAt.UnixMilli())

    return b.rdb.ZAdd(ctx, "queue:"+task.Queue, redis.Z{
        Score:  score,
        Member: data,
    }).Err()
}

Higher priority tasks get lower scores (processed first). Within the same priority, earlier tasks are processed first. This gives us a stable priority queue with FIFO ordering within each priority level.

Dequeuing with Visibility Timeout

The hardest part: how do you dequeue a task atomically so no two workers process the same task?

func (b *Broker) Dequeue(ctx context.Context, queue string, timeout time.Duration) (*Task, error) {
    // Atomic: pop lowest score from pending, add to processing set
    script := redis.NewScript(`
        local task = redis.call('ZPOPMIN', KEYS[1])
        if #task == 0 then return nil end
        redis.call('ZADD', KEYS[2], ARGV[1], task[1])
        return task[1]
    `)

    visibilityDeadline := float64(time.Now().Add(timeout).UnixMilli())
    result, err := script.Run(ctx, b.rdb, []string{
        "queue:" + queue,
        "processing:" + queue,
    }, visibilityDeadline).Text()

    if err == redis.Nil {
        return nil, ErrNoTask
    }
    if err != nil {
        return nil, fmt.Errorf("dequeue: %w", err)
    }

    var task Task
    if err := json.Unmarshal([]byte(result), &task); err != nil {
        return nil, fmt.Errorf("unmarshal task: %w", err)
    }
    return &task, nil
}

The Lua script ensures atomicity. The task moves from the pending queue to a processing set with a visibility deadline. If the worker crashes, a reaper goroutine moves expired tasks back to the pending queue.

The Reaper: Recovering Lost Tasks

func (b *Broker) startReaper(ctx context.Context, queue string) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            now := float64(time.Now().UnixMilli())
            stale, _ := b.rdb.ZRangeByScore(ctx, "processing:"+queue, &redis.ZRangeBy{
                Min: "-inf",
                Max: fmt.Sprintf("%f", now),
            }).Result()

            for _, raw := range stale {
                var task Task
                json.Unmarshal([]byte(raw), &task)
                task.RetryCount++

                if task.RetryCount > task.MaxRetry {
                    b.moveToDLQ(ctx, queue, raw)
                } else {
                    b.Enqueue(ctx, &task)
                }
                b.rdb.ZRem(ctx, "processing:"+queue, raw)
            }
        }
    }
}

Every 30 seconds, the reaper scans the processing set for tasks past their visibility deadline. If retries remain, re-enqueue. Otherwise, move to the dead-letter queue.

Worker Implementation

type Worker struct {
    broker   *Broker
    handlers map[string]HandlerFunc
    pool     int
}

type HandlerFunc func(ctx context.Context, payload json.RawMessage) error

func (w *Worker) Start(ctx context.Context) {
    var wg sync.WaitGroup
    for i := 0; i < w.pool; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            w.loop(ctx, id)
        }(i)
    }
    wg.Wait()
}

func (w *Worker) loop(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        task, err := w.broker.Dequeue(ctx, "default", 5*time.Minute)
        if errors.Is(err, ErrNoTask) {
            time.Sleep(time.Second)
            continue
        }
        if err != nil {
            slog.Error("dequeue failed", "worker", id, "error", err)
            time.Sleep(time.Second)
            continue
        }

        taskCtx, cancel := context.WithDeadline(ctx, task.Deadline)
        err = w.handlers[task.Queue](taskCtx, task.Payload)
        cancel()

        if err != nil {
            slog.Error("task failed", "task_id", task.ID, "error", err)
            w.broker.Retry(ctx, task)
        } else {
            w.broker.Ack(ctx, task)
        }
    }
}

Each worker runs a pool of goroutines. Each goroutine polls for tasks, processes them with a deadline context, and either acknowledges or retries on failure.

Lessons Learned

1. Visibility timeout must be longer than your longest task. If a task takes 10 minutes but your visibility timeout is 5 minutes, the reaper will re-enqueue it while it’s still running. Now two workers process the same task.

2. Lua scripts in Redis are your friend. Any operation that needs to be atomic across multiple keys should be a Lua script. Redis executes Lua scripts atomically.

3. Dead-letter queues are essential. Without a DLQ, poison messages (tasks that always fail) will retry forever, consuming resources and polluting logs.

4. Metrics matter more than you think. Queue depth, processing latency, failure rate, DLQ size — these are the numbers that tell you if your system is healthy.

5. Horizontal scaling is straightforward. Because dequeue is atomic, you can run as many workers as you want. They’ll compete for tasks safely.

What I’d Do Differently

If I were building this for production:

  • Use a proper message broker (RabbitMQ, SQS, or NATS JetStream) instead of Redis
  • Add task deduplication using a Bloom filter or Redis SET
  • Implement exponential backoff for retries instead of immediate re-enqueue
  • Add task dependencies (task B runs only after task A succeeds)
  • Support task cancellation via the context

Building a task queue from scratch taught me more about distributed systems than reading any textbook. The edge cases — duplicate processing, lost tasks, poison messages, ordering guarantees — are where the real complexity lives.