Designing a High-Throughput Event Processing Pipeline in Go
We needed to process analytics events — millions per day, with sub-second latency, zero data loss, and the ability to fan out to multiple sinks (database, data warehouse, real-time dashboard).
Here’s the pipeline architecture in Go.
Pipeline Architecture
Ingestion → Buffer → Fan-out → Sink Writers
├── PostgreSQL (batch insert)
├── Kafka (streaming)
└── ClickHouse (analytics)
Each stage is connected by Go channels, creating a clean data flow with built-in backpressure.
Ingestion Layer
The entry point receives events over HTTP and pushes them into a buffered channel:
type Ingester struct {
events chan<- Event
}
func (i *Ingester) Handler(w http.ResponseWriter, r *http.Request) {
var batch []Event
if err := json.NewDecoder(r.Body).Decode(&batch); err != nil {
http.Error(w, "invalid payload", http.StatusBadRequest)
return
}
for _, event := range batch {
event.ReceivedAt = time.Now()
select {
case i.events <- event:
default:
// Channel full — backpressure
http.Error(w, "pipeline overloaded", http.StatusServiceUnavailable)
return
}
}
w.WriteHeader(http.StatusAccepted)
}
The buffered channel acts as an in-memory queue. When it fills up, the API returns 503, signaling clients to back off.
Batching Stage
Writing one event at a time to a database is slow. Batch them:
type Batcher struct {
input <-chan Event
output chan<- []Event
batchSize int
flushInterval time.Duration
}
func (b *Batcher) Run(ctx context.Context) {
batch := make([]Event, 0, b.batchSize)
ticker := time.NewTicker(b.flushInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
b.output <- batch
}
return
case event, ok := <-b.input:
if !ok {
if len(batch) > 0 {
b.output <- batch
}
return
}
batch = append(batch, event)
if len(batch) >= b.batchSize {
b.output <- batch
batch = make([]Event, 0, b.batchSize)
}
case <-ticker.C:
if len(batch) > 0 {
b.output <- batch
batch = make([]Event, 0, b.batchSize)
}
}
}
}
Flush on size threshold OR time interval — whichever comes first. This ensures low-traffic periods don’t have unbounded latency.
Fan-Out
Send each batch to multiple sinks concurrently:
type FanOut struct {
input <-chan []Event
sinks []Sink
}
type Sink interface {
Write(ctx context.Context, events []Event) error
Name() string
}
func (f *FanOut) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case batch, ok := <-f.input:
if !ok {
return
}
var wg sync.WaitGroup
for _, sink := range f.sinks {
wg.Add(1)
go func(s Sink) {
defer wg.Done()
sinkCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := s.Write(sinkCtx, batch); err != nil {
slog.Error("sink write failed",
"sink", s.Name(),
"batch_size", len(batch),
"error", err,
)
// Don't block other sinks — log and continue
}
}(sink)
}
wg.Wait()
}
}
}
Critical design decision: sink failures don’t block other sinks. If ClickHouse is down, PostgreSQL and Kafka still receive events.
Batch Insert Sink
The PostgreSQL sink uses COPY for maximum throughput:
type PostgresSink struct {
pool *pgxpool.Pool
}
func (s *PostgresSink) Write(ctx context.Context, events []Event) error {
_, err := s.pool.CopyFrom(ctx,
pgx.Identifier{"events"},
[]string{"id", "type", "user_id", "payload", "created_at", "received_at"},
pgx.CopyFromSlice(len(events), func(i int) ([]interface{}, error) {
e := events[i]
return []interface{}{
e.ID, e.Type, e.UserID, e.Payload, e.CreatedAt, e.ReceivedAt,
}, nil
}),
)
return err
}
func (s *PostgresSink) Name() string { return "postgres" }
COPY is orders of magnitude faster than individual INSERT statements for bulk loading.
Wiring It Together
func main() {
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer cancel()
events := make(chan Event, 10_000)
batches := make(chan []Event, 100)
ingester := &Ingester{events: events}
batcher := &Batcher{
input: events,
output: batches,
batchSize: 500,
flushInterval: time.Second,
}
fanout := &FanOut{
input: batches,
sinks: []Sink{
&PostgresSink{pool: pgPool},
&KafkaSink{producer: kafkaProducer},
&ClickHouseSink{conn: chConn},
},
}
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { return runHTTP(ctx, ingester) })
g.Go(func() error { batcher.Run(ctx); return nil })
g.Go(func() error { fanout.Run(ctx); return nil })
if err := g.Wait(); err != nil {
slog.Error("pipeline error", "error", err)
}
}
Each stage runs as a goroutine. Channels provide the backpressure mechanism — if sinks slow down, batches channel fills up, batcher slows down, events channel fills up, API returns 503.
Numbers
With this architecture on a single Go process:
- Ingestion: 50K events/second
- Batch insert: 10K events/second to PostgreSQL via COPY
- End-to-end latency: P50 under 200ms, P99 under 2s
- Memory: Stable at ~200MB regardless of throughput
Scaling horizontally is straightforward — run multiple instances behind a load balancer. Each instance is stateless.
Channels are Go’s killer feature for pipeline architectures. They give you concurrency, batching, buffering, and backpressure with clean, readable code.