← back to posts

Production Patterns for Event-Driven Systems in Go

Event-driven systems have a reputation for being hard to operate. Most of the difficulty comes from missing patterns that should be table stakes. Here are the patterns I always implement.

Transactional Outbox

The classic problem: you need to update the database AND publish an event. If you do them separately, one can succeed while the other fails.

// DON'T: two separate operations
func (s *Service) CreateOrder(ctx context.Context, order Order) error {
    if err := s.db.InsertOrder(ctx, order); err != nil {
        return err
    }
    // What if this fails? DB has the order, but no event was published
    return s.publisher.Publish(ctx, OrderCreatedEvent{...})
}

The outbox pattern: write the event to a database table in the same transaction as your data change. A separate process reads the outbox and publishes.

func (s *Service) CreateOrder(ctx context.Context, order Order) error {
    return s.db.WithTx(ctx, func(tx pgx.Tx) error {
        if err := insertOrder(ctx, tx, order); err != nil {
            return err
        }

        event := OutboxEvent{
            ID:        uuid.New().String(),
            Type:      "OrderCreated",
            Payload:   marshal(OrderCreatedEvent{OrderID: order.ID}),
            CreatedAt: time.Now(),
        }
        return insertOutboxEvent(ctx, tx, event)
    })
}

The outbox relay polls the table and publishes:

func (r *OutboxRelay) Run(ctx context.Context) {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            events, err := r.db.GetUnpublishedEvents(ctx, 100)
            if err != nil {
                slog.Error("fetch outbox events", "error", err)
                continue
            }

            for _, event := range events {
                if err := r.publisher.Publish(ctx, event); err != nil {
                    slog.Error("publish event", "event_id", event.ID, "error", err)
                    break // Stop, retry from this event next tick
                }
                r.db.MarkPublished(ctx, event.ID)
            }
        }
    }
}

This guarantees at-least-once delivery. Combined with idempotent consumers, you get effectively-once semantics.

Consumer Groups and Partitioning

For high-throughput topics, a single consumer can’t keep up. Use consumer groups with partitioned topics:

type PartitionedConsumer struct {
    group       string
    handler     Handler
    concurrency int
}

func (c *PartitionedConsumer) Start(ctx context.Context, topic string) error {
    consumer, err := kafka.NewConsumer(kafka.ConsumerConfig{
        GroupID: c.group,
        Topics:  []string{topic},
    })
    if err != nil {
        return err
    }

    g, ctx := errgroup.WithContext(ctx)
    for i := 0; i < c.concurrency; i++ {
        g.Go(func() error {
            for {
                select {
                case <-ctx.Done():
                    return nil
                default:
                }

                msg, err := consumer.ReadMessage(ctx)
                if err != nil {
                    continue
                }

                if err := c.handler(ctx, msg.Value); err != nil {
                    slog.Error("handle message", "error", err)
                }
                consumer.CommitMessages(ctx, msg)
            }
        })
    }

    return g.Wait()
}

Key: partition your events by a natural key (customer ID, order ID). This guarantees ordering within a partition while allowing parallel processing across partitions.

Event Replay

When you deploy a new consumer or fix a bug, you often need to reprocess historical events. Design for this from day one.

type ReplayableConsumer struct {
    consumer Consumer
    handler  Handler
    store    OffsetStore
}

func (c *ReplayableConsumer) ReplayFrom(ctx context.Context, timestamp time.Time) error {
    slog.Info("starting replay", "from", timestamp)

    offset, err := c.consumer.OffsetForTimestamp(ctx, timestamp)
    if err != nil {
        return err
    }

    c.consumer.Seek(ctx, offset)

    count := 0
    for {
        msg, err := c.consumer.ReadMessage(ctx)
        if err != nil {
            return err
        }

        if err := c.handler(ctx, msg.Value); err != nil {
            slog.Error("replay handler error", "error", err)
            continue // Don't stop replay for individual failures
        }

        count++
        if count%10000 == 0 {
            slog.Info("replay progress", "processed", count)
        }
    }
}

Prerequisites:

  • Events must be stored durably (Kafka retention, event store)
  • Consumers must be idempotent (replayed events hit the same handlers)
  • Have a way to distinguish replay traffic from live traffic in metrics

Schema Evolution

Events are a public API. You will need to change them without breaking consumers.

Strategy: always additive, never breaking.

// v1
type OrderCreated_v1 struct {
    OrderID string `json:"order_id"`
    Amount  int64  `json:"amount"`
}

// v2 — added fields, didn't remove any
type OrderCreated_v2 struct {
    OrderID    string `json:"order_id"`
    Amount     int64  `json:"amount"`
    Currency   string `json:"currency"`    // New
    CustomerID string `json:"customer_id"` // New
}

Consumers that only know v1 ignore the new fields (standard JSON behavior). New consumers can use the new fields.

For breaking changes, publish a new event type entirely:

// Don't modify OrderCreated
// Instead, publish OrderCreatedV2 alongside OrderCreated during migration
// Then deprecate OrderCreated after all consumers migrate

Dead Letter Queue with Context

When events fail, preserve the full context for debugging:

type DeadLetterEntry struct {
    OriginalEvent Event     `json:"original_event"`
    Error         string    `json:"error"`
    StackTrace    string    `json:"stack_trace"`
    ConsumerGroup string    `json:"consumer_group"`
    Partition     int       `json:"partition"`
    Offset        int64     `json:"offset"`
    FailedAt      time.Time `json:"failed_at"`
    RetryCount    int       `json:"retry_count"`
}

Build a simple UI or CLI tool to inspect, replay, and purge DLQ entries. This is the most-used debugging tool for event-driven systems.

Monitoring Checklist

  • Consumer lag per group — are consumers keeping up?
  • Event publish rate — is the system producing events?
  • Processing error rate — are handlers failing?
  • DLQ size — are poison events accumulating?
  • End-to-end latency — time from publish to consumer processing
  • Outbox relay lag — time between DB write and event publish

These patterns aren’t optional — they’re the minimum for running event-driven systems in production. Skip any one of them and you’ll feel the pain within weeks.