Reliable Background Jobs in Go: Retries, Dead Letters, and Idempotency
Background jobs seem simple until they aren’t. “Process this later” becomes “process this later, exactly once, with retries, timeout handling, priority scheduling, and a way to debug failures at 3 AM.”
Here’s how I build reliable background job systems in Go.
The Core Interface
type Job struct {
ID string `json:"id"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
Priority int `json:"priority"`
MaxRetry int `json:"max_retry"`
RetryCount int `json:"retry_count"`
RunAt time.Time `json:"run_at"`
Timeout time.Duration `json:"timeout"`
}
type Handler func(ctx context.Context, payload json.RawMessage) error
type JobProcessor struct {
handlers map[string]Handler
queue Queue
dlq Queue
}
Every job has a type, a payload, retry limits, and an optional scheduled time. Handlers are registered by job type.
Exponential Backoff with Jitter
Immediate retries are almost always wrong. If a job failed because a downstream service is overloaded, retrying instantly makes things worse.
func backoffDelay(retryCount int) time.Duration {
base := time.Second * time.Duration(math.Pow(2, float64(retryCount)))
if base > 30*time.Minute {
base = 30 * time.Minute
}
// Add jitter: ±25% of base delay
jitter := time.Duration(rand.Int63n(int64(base) / 2))
return base + jitter - (base / 4)
}
Retry 1: ~2s. Retry 2: ~4s. Retry 3: ~8s. With jitter so retries from multiple workers don’t thundering-herd the downstream service.
Idempotent Handlers
Jobs will be delivered at least once. Your handler must be safe to run multiple times with the same input.
func (p *JobProcessor) processWithIdempotency(ctx context.Context, job Job) error {
// Check if already processed
key := fmt.Sprintf("job:processed:%s", job.ID)
set, err := p.redis.SetNX(ctx, key, "1", 7*24*time.Hour).Result()
if err != nil {
return fmt.Errorf("idempotency check: %w", err)
}
if !set {
slog.Info("job already processed", "job_id", job.ID)
return nil // Already done
}
handler, ok := p.handlers[job.Type]
if !ok {
return fmt.Errorf("no handler for job type: %s", job.Type)
}
if err := handler(ctx, job.Payload); err != nil {
// Remove the idempotency key so retry can attempt again
p.redis.Del(ctx, key)
return err
}
return nil
}
The idempotency key is set before processing and removed on failure. This ensures successful jobs are never reprocessed, but failed jobs can be retried.
Dead-Letter Queue
After exhausting retries, jobs go to the DLQ — not silently dropped.
func (p *JobProcessor) handleFailure(ctx context.Context, job Job, err error) {
job.RetryCount++
if job.RetryCount > job.MaxRetry {
slog.Error("job exhausted retries, moving to DLQ",
"job_id", job.ID,
"type", job.Type,
"retries", job.RetryCount,
"last_error", err.Error(),
)
dlqJob := DLQEntry{
Job: job,
Error: err.Error(),
FailedAt: time.Now(),
}
p.dlq.Enqueue(ctx, dlqJob)
return
}
// Schedule retry with backoff
delay := backoffDelay(job.RetryCount)
job.RunAt = time.Now().Add(delay)
slog.Warn("retrying job",
"job_id", job.ID,
"retry", job.RetryCount,
"delay", delay,
)
p.queue.Enqueue(ctx, job)
}
We build a simple admin endpoint to inspect and replay DLQ entries:
func (p *JobProcessor) ReplayDLQ(ctx context.Context, jobID string) error {
entry, err := p.dlq.Get(ctx, jobID)
if err != nil {
return err
}
entry.Job.RetryCount = 0
entry.Job.RunAt = time.Now()
return p.queue.Enqueue(ctx, entry.Job)
}
Timeout Enforcement
A handler that hangs forever blocks a worker. Always enforce timeouts:
func (p *JobProcessor) execute(ctx context.Context, job Job) error {
timeout := job.Timeout
if timeout == 0 {
timeout = 5 * time.Minute // Default timeout
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
done := make(chan error, 1)
go func() {
done <- p.processWithIdempotency(ctx, job)
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return fmt.Errorf("job timed out after %s", timeout)
}
}
Graceful Shutdown
When deploying, you don’t want to kill in-flight jobs. Wait for them to finish:
func (p *JobProcessor) Start(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < p.concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
p.workerLoop(ctx)
}()
}
<-ctx.Done()
slog.Info("shutting down, waiting for in-flight jobs...")
wg.Wait()
slog.Info("all workers stopped")
}
The ctx comes from signal handling in main:
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
processor.Start(ctx)
}
When a SIGTERM arrives, the context cancels, workers stop polling for new jobs, but in-flight jobs finish naturally.
Monitoring
The metrics that matter:
var (
jobsProcessed = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "jobs_processed_total"},
[]string{"type", "status"},
)
jobDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: "job_duration_seconds"},
[]string{"type"},
)
queueDepth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: "queue_depth"},
[]string{"queue"},
)
dlqSize = prometheus.NewGauge(
prometheus.GaugeOpts{Name: "dlq_size"},
)
)
Alert on:
- Queue depth growing (consumers can’t keep up)
- DLQ size growing (poison jobs accumulating)
- Job duration exceeding P99 threshold
- Error rate exceeding threshold
Checklist for Production
- Exponential backoff with jitter on retries
- Idempotent handlers (safe to reprocess)
- Dead-letter queue for exhausted retries
- Timeout enforcement on every job
- Graceful shutdown (wait for in-flight jobs)
- Structured logging with job ID context
- Metrics: throughput, latency, error rate, queue depth, DLQ size
- Admin endpoint to inspect and replay DLQ entries
Background jobs are infrastructure. Treat them with the same rigor as your API endpoints.