Production Patterns for Event-Driven Systems in Go
Event-driven systems have a reputation for being hard to operate. Most of the difficulty comes from missing patterns that should be table stakes. Here are the patterns I always implement.
Transactional Outbox
The classic problem: you need to update the database AND publish an event. If you do them separately, one can succeed while the other fails.
// DON'T: two separate operations
func (s *Service) CreateOrder(ctx context.Context, order Order) error {
if err := s.db.InsertOrder(ctx, order); err != nil {
return err
}
// What if this fails? DB has the order, but no event was published
return s.publisher.Publish(ctx, OrderCreatedEvent{...})
}
The outbox pattern: write the event to a database table in the same transaction as your data change. A separate process reads the outbox and publishes.
func (s *Service) CreateOrder(ctx context.Context, order Order) error {
return s.db.WithTx(ctx, func(tx pgx.Tx) error {
if err := insertOrder(ctx, tx, order); err != nil {
return err
}
event := OutboxEvent{
ID: uuid.New().String(),
Type: "OrderCreated",
Payload: marshal(OrderCreatedEvent{OrderID: order.ID}),
CreatedAt: time.Now(),
}
return insertOutboxEvent(ctx, tx, event)
})
}
The outbox relay polls the table and publishes:
func (r *OutboxRelay) Run(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
events, err := r.db.GetUnpublishedEvents(ctx, 100)
if err != nil {
slog.Error("fetch outbox events", "error", err)
continue
}
for _, event := range events {
if err := r.publisher.Publish(ctx, event); err != nil {
slog.Error("publish event", "event_id", event.ID, "error", err)
break // Stop, retry from this event next tick
}
r.db.MarkPublished(ctx, event.ID)
}
}
}
}
This guarantees at-least-once delivery. Combined with idempotent consumers, you get effectively-once semantics.
Consumer Groups and Partitioning
For high-throughput topics, a single consumer can’t keep up. Use consumer groups with partitioned topics:
type PartitionedConsumer struct {
group string
handler Handler
concurrency int
}
func (c *PartitionedConsumer) Start(ctx context.Context, topic string) error {
consumer, err := kafka.NewConsumer(kafka.ConsumerConfig{
GroupID: c.group,
Topics: []string{topic},
})
if err != nil {
return err
}
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < c.concurrency; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
default:
}
msg, err := consumer.ReadMessage(ctx)
if err != nil {
continue
}
if err := c.handler(ctx, msg.Value); err != nil {
slog.Error("handle message", "error", err)
}
consumer.CommitMessages(ctx, msg)
}
})
}
return g.Wait()
}
Key: partition your events by a natural key (customer ID, order ID). This guarantees ordering within a partition while allowing parallel processing across partitions.
Event Replay
When you deploy a new consumer or fix a bug, you often need to reprocess historical events. Design for this from day one.
type ReplayableConsumer struct {
consumer Consumer
handler Handler
store OffsetStore
}
func (c *ReplayableConsumer) ReplayFrom(ctx context.Context, timestamp time.Time) error {
slog.Info("starting replay", "from", timestamp)
offset, err := c.consumer.OffsetForTimestamp(ctx, timestamp)
if err != nil {
return err
}
c.consumer.Seek(ctx, offset)
count := 0
for {
msg, err := c.consumer.ReadMessage(ctx)
if err != nil {
return err
}
if err := c.handler(ctx, msg.Value); err != nil {
slog.Error("replay handler error", "error", err)
continue // Don't stop replay for individual failures
}
count++
if count%10000 == 0 {
slog.Info("replay progress", "processed", count)
}
}
}
Prerequisites:
- Events must be stored durably (Kafka retention, event store)
- Consumers must be idempotent (replayed events hit the same handlers)
- Have a way to distinguish replay traffic from live traffic in metrics
Schema Evolution
Events are a public API. You will need to change them without breaking consumers.
Strategy: always additive, never breaking.
// v1
type OrderCreated_v1 struct {
OrderID string `json:"order_id"`
Amount int64 `json:"amount"`
}
// v2 — added fields, didn't remove any
type OrderCreated_v2 struct {
OrderID string `json:"order_id"`
Amount int64 `json:"amount"`
Currency string `json:"currency"` // New
CustomerID string `json:"customer_id"` // New
}
Consumers that only know v1 ignore the new fields (standard JSON behavior). New consumers can use the new fields.
For breaking changes, publish a new event type entirely:
// Don't modify OrderCreated
// Instead, publish OrderCreatedV2 alongside OrderCreated during migration
// Then deprecate OrderCreated after all consumers migrate
Dead Letter Queue with Context
When events fail, preserve the full context for debugging:
type DeadLetterEntry struct {
OriginalEvent Event `json:"original_event"`
Error string `json:"error"`
StackTrace string `json:"stack_trace"`
ConsumerGroup string `json:"consumer_group"`
Partition int `json:"partition"`
Offset int64 `json:"offset"`
FailedAt time.Time `json:"failed_at"`
RetryCount int `json:"retry_count"`
}
Build a simple UI or CLI tool to inspect, replay, and purge DLQ entries. This is the most-used debugging tool for event-driven systems.
Monitoring Checklist
- Consumer lag per group — are consumers keeping up?
- Event publish rate — is the system producing events?
- Processing error rate — are handlers failing?
- DLQ size — are poison events accumulating?
- End-to-end latency — time from publish to consumer processing
- Outbox relay lag — time between DB write and event publish
These patterns aren’t optional — they’re the minimum for running event-driven systems in production. Skip any one of them and you’ll feel the pain within weeks.