Skip to main content

Command Palette

Search for a command to run...

Event-Driven Microservices with Kafka: Loose Coupling Done Right

Published
8 min read
E

Backend Developer | Golang & Python I enjoy building reliable APIs, distributed systems, and automation tools. Writing here about backend engineering, system design, and real-world dev experiences.

Introduction

Here's a mistake I made early on: building microservices that call each other directly.

Order Service calls Payment Service. Payment Service calls Inventory Service. Inventory Service calls Notification Service. Everything is synchronous. Everything is coupled.

Then one service goes down and the entire flow breaks. Or you get cascading failures. Or you're spending hours debugging which service is causing the timeout.

That's when I learned about event-driven architecture. Services don't call each other—they publish events. Other services subscribe and react. Loose coupling, async processing, built-in resilience.

In this post, I'll show you how to build event-driven microservices using Kafka in Go. We'll cover:

  • Why events over direct calls

  • Kafka setup and integration

  • Publishing and consuming events

  • Event schemas and versioning

  • Error handling and retries

  • Dead letter queues

This is what I wish someone had shown me before I built my first microservices system.

Why Event-Driven Architecture?

Let's compare two approaches for creating an order:

Synchronous (direct calls):

func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) error {
    // Create order
    order := NewOrder(req)
    s.repo.Save(order)

    // Call payment service
    payment, err := s.paymentClient.ProcessPayment(order.ID, order.Total)
    if err != nil {
        return err  // order created but payment failed
    }

    // Call inventory service
    err = s.inventoryClient.ReserveStock(order.Items)
    if err != nil {
        // Now we need to refund the payment...
        s.paymentClient.RefundPayment(payment.ID)
        return err
    }

    // Call notification service
    s.notificationClient.SendConfirmation(order.CustomerID)

    return nil
}

Problems:

  • Tight coupling: Order service knows about every downstream service

  • Synchronous: User waits for everything to complete

  • Cascading failures: If notifications are down, orders fail

  • Hard to add services: Want to add a loyalty points service? Modify order service code.

Event-Driven (async):

func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) error {
    // Create order
    order := NewOrder(req)
    s.repo.Save(order)

    // Publish event
    event := OrderCreatedEvent{
        OrderID:    order.ID,
        CustomerID: order.CustomerID,
        Items:      order.Items,
        Total:      order.Total,
        Timestamp:  time.Now(),
    }
    s.publisher.Publish("order.created", event)

    return nil  // Done! Other services will handle the rest.
}

Benefits:

  • Loose coupling: Order service doesn't know what happens next

  • Async: User gets instant response

  • Resilient: If payment service is down, Kafka buffers events

  • Extensible: Add new subscribers without touching existing code

Payment service subscribes and reacts:

func (c *PaymentConsumer) HandleOrderCreated(event OrderCreatedEvent) error {
    payment, err := c.paymentService.ProcessPayment(event.OrderID, event.Total)
    if err != nil {
        c.publisher.Publish("order.payment.failed", event.OrderID)
        return err
    }

    c.publisher.Publish("order.payment.completed", payment)
    return nil
}

This is the power of events: services react to things that happen, instead of being told what to do.

Setting Up Kafka

Kafka is the most popular event streaming platform. It's fast, distributed, and handles millions of messages per second.

Run Kafka locally with Docker:

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Start it:

docker-compose up -d

Install the Go Kafka client:

go get github.com/IBM/sarama

Publishing Events

Here's a production-ready Kafka producer:

package kafka

import (
    "encoding/json"
    "github.com/IBM/sarama"
)

type Producer struct {
    producer sarama.SyncProducer
}

func NewProducer(brokers []string) (*Producer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll  // Wait for all replicas
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }

    return &Producer{producer: producer}, nil
}

func (p *Producer) Publish(topic string, event interface{}) error {
    // Serialize event to JSON
    eventBytes, err := json.Marshal(event)
    if err != nil {
        return err
    }

    // Create message
    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(eventBytes),
    }

    // Send synchronously
    _, _, err = p.producer.SendMessage(msg)
    return err
}

func (p *Producer) Close() error {
    return p.producer.Close()
}

Using it in your service:

type OrderService struct {
    repo      OrderRepository
    publisher *kafka.Producer
}

func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) (*Order, error) {
    order, err := NewOrder(req.CustomerID, req.Items)
    if err != nil {
        return nil, err
    }

    // Save to database
    if err := s.repo.Create(ctx, order); err != nil {
        return nil, err
    }

    // Publish event
    event := OrderCreatedEvent{
        OrderID:    order.ID().Hex(),
        CustomerID: order.CustomerID(),
        Items:      req.Items,
        Total:      order.TotalAmount(),
        Timestamp:  time.Now(),
    }

    if err := s.publisher.Publish("order.created", event); err != nil {
        // Log error but don't fail the request
        // Event will be published by a retry mechanism
        logger.Error("failed to publish event", zap.Error(err))
    }

    return order, nil
}

Event Schemas

Define clear event structures. Other services depend on these:

// order_events.go
type OrderCreatedEvent struct {
    OrderID    string      `json:"order_id"`
    CustomerID string      `json:"customer_id"`
    Items      []OrderItem `json:"items"`
    Total      float64     `json:"total"`
    Timestamp  time.Time   `json:"timestamp"`
}

type OrderCompletedEvent struct {
    OrderID   string    `json:"order_id"`
    Timestamp time.Time `json:"timestamp"`
}

type OrderFailedEvent struct {
    OrderID   string    `json:"order_id"`
    Reason    string    `json:"reason"`
    Timestamp time.Time `json:"timestamp"`
}

type OrderItem struct {
    ProductID string  `json:"product_id"`
    Quantity  int     `json:"quantity"`
    Price     float64 `json:"price"`
}

Best practices:

  • Use JSON for compatibility (easy to consume from any language)

  • Include timestamps (for debugging and ordering)

  • Make events immutable (never change published events)

  • Version your events (add a version field for breaking changes)

Consuming Events

Here's a consumer that handles order events:

package kafka

import (
    "context"
    "encoding/json"
    "github.com/IBM/sarama"
)

type Consumer struct {
    consumer sarama.ConsumerGroup
    handler  MessageHandler
}

type MessageHandler interface {
    Handle(ctx context.Context, topic string, message []byte) error
}

func NewConsumer(brokers []string, groupID string, topics []string, handler MessageHandler) (*Consumer, error) {
    config := sarama.NewConfig()
    config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
    if err != nil {
        return nil, err
    }

    c := &Consumer{
        consumer: consumer,
        handler:  handler,
    }

    go c.consume(context.Background(), topics)

    return c, nil
}

func (c *Consumer) consume(ctx context.Context, topics []string) {
    for {
        err := c.consumer.Consume(ctx, topics, c)
        if err != nil {
            logger.Error("consumer error", zap.Error(err))
        }
    }
}

// Setup is called when a new session starts
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil }

// ConsumeClaim processes messages
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        ctx := context.Background()

        // Handle message
        err := c.handler.Handle(ctx, message.Topic, message.Value)
        if err != nil {
            logger.Error("failed to handle message",
                zap.String("topic", message.Topic),
                zap.Error(err),
            )
            // Don't mark as consumed, will retry
            continue
        }

        // Mark message as consumed
        session.MarkMessage(message, "")
    }

    return nil
}

Payment service handler:

type PaymentEventHandler struct {
    paymentService *PaymentService
    publisher      *kafka.Producer
}

func (h *PaymentEventHandler) Handle(ctx context.Context, topic string, message []byte) error {
    switch topic {
    case "order.created":
        var event OrderCreatedEvent
        if err := json.Unmarshal(message, &event); err != nil {
            return err
        }
        return h.handleOrderCreated(ctx, event)

    default:
        return nil  // ignore unknown topics
    }
}

func (h *PaymentEventHandler) handleOrderCreated(ctx context.Context, event OrderCreatedEvent) error {
    logger.Info("processing payment for order", zap.String("order_id", event.OrderID))

    payment, err := h.paymentService.ProcessPayment(ctx, event.OrderID, event.Total)
    if err != nil {
        // Publish failure event
        failureEvent := OrderPaymentFailedEvent{
            OrderID:   event.OrderID,
            Reason:    err.Error(),
            Timestamp: time.Now(),
        }
        h.publisher.Publish("order.payment.failed", failureEvent)
        return err
    }

    // Publish success event
    successEvent := OrderPaymentCompletedEvent{
        OrderID:   event.OrderID,
        PaymentID: payment.ID,
        Timestamp: time.Now(),
    }
    h.publisher.Publish("order.payment.completed", successEvent)

    logger.Info("payment processed successfully", zap.String("order_id", event.OrderID))
    return nil
}

Error Handling and Retries

What happens when processing fails?

Option 1: Retry with Backoff

func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        ctx := context.Background()

        // Retry up to 3 times
        var err error
        for attempt := 0; attempt < 3; attempt++ {
            err = c.handler.Handle(ctx, message.Topic, message.Value)
            if err == nil {
                break  // success
            }

            // Exponential backoff
            time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
        }

        if err != nil {
            // After 3 retries, send to dead letter queue
            c.sendToDeadLetterQueue(message)
        }

        // Always mark as consumed (don't block the consumer)
        session.MarkMessage(message, "")
    }

    return nil
}

Option 2: Dead Letter Queue

Send failed messages to a separate topic for manual review:

func (c *Consumer) sendToDeadLetterQueue(message *sarama.ConsumerMessage) {
    dlqTopic := message.Topic + ".dlq"

    dlqMessage := &sarama.ProducerMessage{
        Topic: dlqTopic,
        Value: sarama.ByteEncoder(message.Value),
        Headers: []sarama.RecordHeader{
            {Key: []byte("original-topic"), Value: []byte(message.Topic)},
            {Key: []byte("error-time"), Value: []byte(time.Now().Format(time.RFC3339))},
        },
    }

    c.producer.SendMessage(dlqMessage)
    logger.Warn("message sent to DLQ", zap.String("topic", dlqTopic))
}

Idempotency: Handling Duplicates

Kafka can deliver messages more than once (at-least-once delivery). Your handlers need to be idempotent.

type PaymentService struct {
    repo          PaymentRepository
    processedIDs  *redis.Client  // track processed events
}

func (s *PaymentService) ProcessPayment(ctx context.Context, orderID string, amount float64) (*Payment, error) {
    // Check if already processed
    key := fmt.Sprintf("processed:payment:%s", orderID)
    exists, _ := s.processedIDs.Exists(ctx, key).Result()
    if exists > 0 {
        logger.Info("payment already processed", zap.String("order_id", orderID))
        return s.repo.FindByOrderID(ctx, orderID)
    }

    // Process payment
    payment := &Payment{
        OrderID: orderID,
        Amount:  amount,
        Status:  "completed",
    }

    if err := s.repo.Create(ctx, payment); err != nil {
        return nil, err
    }

    // Mark as processed (expires in 24 hours)
    s.processedIDs.Set(ctx, key, "1", 24*time.Hour)

    return payment, nil
}

Event Versioning

As your system evolves, events change. Handle this gracefully:

type OrderCreatedEvent struct {
    Version    int         `json:"version"`  // v1, v2, etc.
    OrderID    string      `json:"order_id"`
    CustomerID string      `json:"customer_id"`
    Items      []OrderItem `json:"items"`
    Total      float64     `json:"total"`
    Timestamp  time.Time   `json:"timestamp"`

    // v2 fields
    ShippingAddress *Address `json:"shipping_address,omitempty"`
}

func (h *PaymentEventHandler) handleOrderCreated(ctx context.Context, event OrderCreatedEvent) error {
    switch event.Version {
    case 1:
        return h.handleOrderCreatedV1(ctx, event)
    case 2:
        return h.handleOrderCreatedV2(ctx, event)
    default:
        return fmt.Errorf("unsupported event version: %d", event.Version)
    }
}

Monitoring and Observability

Add metrics to track event processing:

var (
    eventsProcessed = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "kafka_events_processed_total",
            Help: "Total events processed",
        },
        []string{"topic", "status"},
    )

    processingDuration = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "kafka_event_processing_duration_seconds",
            Help: "Event processing duration",
        },
        []string{"topic"},
    )
)

func (h *PaymentEventHandler) Handle(ctx context.Context, topic string, message []byte) error {
    start := time.Now()

    err := h.actualHandler(ctx, topic, message)

    status := "success"
    if err != nil {
        status = "error"
    }

    eventsProcessed.WithLabelValues(topic, status).Inc()
    processingDuration.WithLabelValues(topic).Observe(time.Since(start).Seconds())

    return err
}

Common Pitfalls

  1. Not handling duplicates: Always make handlers idempotent.

  2. Blocking consumers: Don't do slow operations in handlers (delegate to workers).

  3. No dead letter queue: Failed messages disappear forever.

  4. Ignoring ordering: Kafka guarantees order per partition, not across partitions.

  5. Too many consumers: Coordinate consumer groups carefully.

Wrapping Up

Event-driven architecture transforms microservices from tightly coupled spaghetti to loosely coupled, resilient systems.

Start simple:

  1. Pick one flow (e.g., order creation)

  2. Replace direct calls with events

  3. Add error handling and retries

  4. Monitor event processing

Once you see the benefits—resilience, scalability, extensibility—you'll wonder how you ever built systems without events.

Next up: The Saga pattern for handling distributed transactions across services.

Questions? Drop a comment. Always happy to talk about event-driven architecture.

Resources


Thanks for reading! This is part of my microservices series. Follow along for more posts on sagas, caching, and distributed transactions.

More from this blog

eshah.dev

16 posts