Designing a Distributed Task Queue in Go from Scratch
Most task queue libraries are either too simple for production or too complex to debug. I built one from scratch in Go to understand the problem space deeply. Here’s the architecture and the decisions behind it.
Why Build Your Own?
I didn’t build this for production use — we use well-tested solutions for that. I built it to understand the internals: how do you distribute work fairly? How do you handle failures without losing tasks? How do you scale horizontally?
The answers are more nuanced than “just use Redis and BRPOPLPUSH.”
Core Architecture
The system has three components:
┌──────────┐ ┌───────────┐ ┌──────────┐
│ Producer │────▶│ Broker │────▶│ Worker │
│ (enqueue)│ │ (Redis) │ │ (dequeue)│
└──────────┘ └───────────┘ └──────────┘
│
┌─────┴─────┐
│ Dead Letter│
│ Queue │
└────────────┘
The broker is Redis. Producers enqueue tasks. Workers dequeue and process them. Failed tasks eventually land in a dead-letter queue.
Task Representation
type Task struct {
ID string `json:"id"`
Queue string `json:"queue"`
Payload json.RawMessage `json:"payload"`
Priority int `json:"priority"`
MaxRetry int `json:"max_retry"`
RetryCount int `json:"retry_count"`
CreatedAt time.Time `json:"created_at"`
ProcessAt time.Time `json:"process_at"`
Deadline time.Time `json:"deadline"`
}
Every task has a unique ID (for idempotency), a priority level, retry limits, and an optional scheduled time for delayed execution.
Enqueuing with Priority
Redis sorted sets give us priority queues for free. The score is a combination of priority and timestamp:
func (b *Broker) Enqueue(ctx context.Context, task *Task) error {
data, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("marshal task: %w", err)
}
score := float64(task.Priority)*1e13 + float64(task.ProcessAt.UnixMilli())
return b.rdb.ZAdd(ctx, "queue:"+task.Queue, redis.Z{
Score: score,
Member: data,
}).Err()
}
Higher priority tasks get lower scores (processed first). Within the same priority, earlier tasks are processed first. This gives us a stable priority queue with FIFO ordering within each priority level.
Dequeuing with Visibility Timeout
The hardest part: how do you dequeue a task atomically so no two workers process the same task?
func (b *Broker) Dequeue(ctx context.Context, queue string, timeout time.Duration) (*Task, error) {
// Atomic: pop lowest score from pending, add to processing set
script := redis.NewScript(`
local task = redis.call('ZPOPMIN', KEYS[1])
if #task == 0 then return nil end
redis.call('ZADD', KEYS[2], ARGV[1], task[1])
return task[1]
`)
visibilityDeadline := float64(time.Now().Add(timeout).UnixMilli())
result, err := script.Run(ctx, b.rdb, []string{
"queue:" + queue,
"processing:" + queue,
}, visibilityDeadline).Text()
if err == redis.Nil {
return nil, ErrNoTask
}
if err != nil {
return nil, fmt.Errorf("dequeue: %w", err)
}
var task Task
if err := json.Unmarshal([]byte(result), &task); err != nil {
return nil, fmt.Errorf("unmarshal task: %w", err)
}
return &task, nil
}
The Lua script ensures atomicity. The task moves from the pending queue to a processing set with a visibility deadline. If the worker crashes, a reaper goroutine moves expired tasks back to the pending queue.
The Reaper: Recovering Lost Tasks
func (b *Broker) startReaper(ctx context.Context, queue string) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
now := float64(time.Now().UnixMilli())
stale, _ := b.rdb.ZRangeByScore(ctx, "processing:"+queue, &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%f", now),
}).Result()
for _, raw := range stale {
var task Task
json.Unmarshal([]byte(raw), &task)
task.RetryCount++
if task.RetryCount > task.MaxRetry {
b.moveToDLQ(ctx, queue, raw)
} else {
b.Enqueue(ctx, &task)
}
b.rdb.ZRem(ctx, "processing:"+queue, raw)
}
}
}
}
Every 30 seconds, the reaper scans the processing set for tasks past their visibility deadline. If retries remain, re-enqueue. Otherwise, move to the dead-letter queue.
Worker Implementation
type Worker struct {
broker *Broker
handlers map[string]HandlerFunc
pool int
}
type HandlerFunc func(ctx context.Context, payload json.RawMessage) error
func (w *Worker) Start(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < w.pool; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
w.loop(ctx, id)
}(i)
}
wg.Wait()
}
func (w *Worker) loop(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
return
default:
}
task, err := w.broker.Dequeue(ctx, "default", 5*time.Minute)
if errors.Is(err, ErrNoTask) {
time.Sleep(time.Second)
continue
}
if err != nil {
slog.Error("dequeue failed", "worker", id, "error", err)
time.Sleep(time.Second)
continue
}
taskCtx, cancel := context.WithDeadline(ctx, task.Deadline)
err = w.handlers[task.Queue](taskCtx, task.Payload)
cancel()
if err != nil {
slog.Error("task failed", "task_id", task.ID, "error", err)
w.broker.Retry(ctx, task)
} else {
w.broker.Ack(ctx, task)
}
}
}
Each worker runs a pool of goroutines. Each goroutine polls for tasks, processes them with a deadline context, and either acknowledges or retries on failure.
Lessons Learned
1. Visibility timeout must be longer than your longest task. If a task takes 10 minutes but your visibility timeout is 5 minutes, the reaper will re-enqueue it while it’s still running. Now two workers process the same task.
2. Lua scripts in Redis are your friend. Any operation that needs to be atomic across multiple keys should be a Lua script. Redis executes Lua scripts atomically.
3. Dead-letter queues are essential. Without a DLQ, poison messages (tasks that always fail) will retry forever, consuming resources and polluting logs.
4. Metrics matter more than you think. Queue depth, processing latency, failure rate, DLQ size — these are the numbers that tell you if your system is healthy.
5. Horizontal scaling is straightforward. Because dequeue is atomic, you can run as many workers as you want. They’ll compete for tasks safely.
What I’d Do Differently
If I were building this for production:
- Use a proper message broker (RabbitMQ, SQS, or NATS JetStream) instead of Redis
- Add task deduplication using a Bloom filter or Redis SET
- Implement exponential backoff for retries instead of immediate re-enqueue
- Add task dependencies (task B runs only after task A succeeds)
- Support task cancellation via the context
Building a task queue from scratch taught me more about distributed systems than reading any textbook. The edge cases — duplicate processing, lost tasks, poison messages, ordering guarantees — are where the real complexity lives.