← back to posts

Designing a High-Throughput Event Processing Pipeline in Go

We needed to process analytics events — millions per day, with sub-second latency, zero data loss, and the ability to fan out to multiple sinks (database, data warehouse, real-time dashboard).

Here’s the pipeline architecture in Go.

Pipeline Architecture

Ingestion → Buffer → Fan-out → Sink Writers
                                ├── PostgreSQL (batch insert)
                                ├── Kafka (streaming)
                                └── ClickHouse (analytics)

Each stage is connected by Go channels, creating a clean data flow with built-in backpressure.

Ingestion Layer

The entry point receives events over HTTP and pushes them into a buffered channel:

type Ingester struct {
    events chan<- Event
}

func (i *Ingester) Handler(w http.ResponseWriter, r *http.Request) {
    var batch []Event
    if err := json.NewDecoder(r.Body).Decode(&batch); err != nil {
        http.Error(w, "invalid payload", http.StatusBadRequest)
        return
    }

    for _, event := range batch {
        event.ReceivedAt = time.Now()
        select {
        case i.events <- event:
        default:
            // Channel full — backpressure
            http.Error(w, "pipeline overloaded", http.StatusServiceUnavailable)
            return
        }
    }

    w.WriteHeader(http.StatusAccepted)
}

The buffered channel acts as an in-memory queue. When it fills up, the API returns 503, signaling clients to back off.

Batching Stage

Writing one event at a time to a database is slow. Batch them:

type Batcher struct {
    input     <-chan Event
    output    chan<- []Event
    batchSize int
    flushInterval time.Duration
}

func (b *Batcher) Run(ctx context.Context) {
    batch := make([]Event, 0, b.batchSize)
    ticker := time.NewTicker(b.flushInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                b.output <- batch
            }
            return

        case event, ok := <-b.input:
            if !ok {
                if len(batch) > 0 {
                    b.output <- batch
                }
                return
            }
            batch = append(batch, event)
            if len(batch) >= b.batchSize {
                b.output <- batch
                batch = make([]Event, 0, b.batchSize)
            }

        case <-ticker.C:
            if len(batch) > 0 {
                b.output <- batch
                batch = make([]Event, 0, b.batchSize)
            }
        }
    }
}

Flush on size threshold OR time interval — whichever comes first. This ensures low-traffic periods don’t have unbounded latency.

Fan-Out

Send each batch to multiple sinks concurrently:

type FanOut struct {
    input <-chan []Event
    sinks []Sink
}

type Sink interface {
    Write(ctx context.Context, events []Event) error
    Name() string
}

func (f *FanOut) Run(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case batch, ok := <-f.input:
            if !ok {
                return
            }

            var wg sync.WaitGroup
            for _, sink := range f.sinks {
                wg.Add(1)
                go func(s Sink) {
                    defer wg.Done()

                    sinkCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
                    defer cancel()

                    if err := s.Write(sinkCtx, batch); err != nil {
                        slog.Error("sink write failed",
                            "sink", s.Name(),
                            "batch_size", len(batch),
                            "error", err,
                        )
                        // Don't block other sinks — log and continue
                    }
                }(sink)
            }
            wg.Wait()
        }
    }
}

Critical design decision: sink failures don’t block other sinks. If ClickHouse is down, PostgreSQL and Kafka still receive events.

Batch Insert Sink

The PostgreSQL sink uses COPY for maximum throughput:

type PostgresSink struct {
    pool *pgxpool.Pool
}

func (s *PostgresSink) Write(ctx context.Context, events []Event) error {
    _, err := s.pool.CopyFrom(ctx,
        pgx.Identifier{"events"},
        []string{"id", "type", "user_id", "payload", "created_at", "received_at"},
        pgx.CopyFromSlice(len(events), func(i int) ([]interface{}, error) {
            e := events[i]
            return []interface{}{
                e.ID, e.Type, e.UserID, e.Payload, e.CreatedAt, e.ReceivedAt,
            }, nil
        }),
    )
    return err
}

func (s *PostgresSink) Name() string { return "postgres" }

COPY is orders of magnitude faster than individual INSERT statements for bulk loading.

Wiring It Together

func main() {
    ctx, cancel := signal.NotifyContext(context.Background(),
        syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    events := make(chan Event, 10_000)
    batches := make(chan []Event, 100)

    ingester := &Ingester{events: events}
    batcher := &Batcher{
        input:         events,
        output:        batches,
        batchSize:     500,
        flushInterval: time.Second,
    }
    fanout := &FanOut{
        input: batches,
        sinks: []Sink{
            &PostgresSink{pool: pgPool},
            &KafkaSink{producer: kafkaProducer},
            &ClickHouseSink{conn: chConn},
        },
    }

    g, ctx := errgroup.WithContext(ctx)
    g.Go(func() error { return runHTTP(ctx, ingester) })
    g.Go(func() error { batcher.Run(ctx); return nil })
    g.Go(func() error { fanout.Run(ctx); return nil })

    if err := g.Wait(); err != nil {
        slog.Error("pipeline error", "error", err)
    }
}

Each stage runs as a goroutine. Channels provide the backpressure mechanism — if sinks slow down, batches channel fills up, batcher slows down, events channel fills up, API returns 503.

Numbers

With this architecture on a single Go process:

  • Ingestion: 50K events/second
  • Batch insert: 10K events/second to PostgreSQL via COPY
  • End-to-end latency: P50 under 200ms, P99 under 2s
  • Memory: Stable at ~200MB regardless of throughput

Scaling horizontally is straightforward — run multiple instances behind a load balancer. Each instance is stateless.

Channels are Go’s killer feature for pipeline architectures. They give you concurrency, batching, buffering, and backpressure with clean, readable code.