Event-Driven Microservices with Kafka: Loose Coupling Done Right
Backend Developer | Golang & Python I enjoy building reliable APIs, distributed systems, and automation tools. Writing here about backend engineering, system design, and real-world dev experiences.
Introduction
Here's a mistake I made early on: building microservices that call each other directly.
Order Service calls Payment Service. Payment Service calls Inventory Service. Inventory Service calls Notification Service. Everything is synchronous. Everything is coupled.
Then one service goes down and the entire flow breaks. Or you get cascading failures. Or you're spending hours debugging which service is causing the timeout.
That's when I learned about event-driven architecture. Services don't call each other—they publish events. Other services subscribe and react. Loose coupling, async processing, built-in resilience.
In this post, I'll show you how to build event-driven microservices using Kafka in Go. We'll cover:
Why events over direct calls
Kafka setup and integration
Publishing and consuming events
Event schemas and versioning
Error handling and retries
Dead letter queues
This is what I wish someone had shown me before I built my first microservices system.
Why Event-Driven Architecture?
Let's compare two approaches for creating an order:
Synchronous (direct calls):
func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) error {
// Create order
order := NewOrder(req)
s.repo.Save(order)
// Call payment service
payment, err := s.paymentClient.ProcessPayment(order.ID, order.Total)
if err != nil {
return err // order created but payment failed
}
// Call inventory service
err = s.inventoryClient.ReserveStock(order.Items)
if err != nil {
// Now we need to refund the payment...
s.paymentClient.RefundPayment(payment.ID)
return err
}
// Call notification service
s.notificationClient.SendConfirmation(order.CustomerID)
return nil
}
Problems:
Tight coupling: Order service knows about every downstream service
Synchronous: User waits for everything to complete
Cascading failures: If notifications are down, orders fail
Hard to add services: Want to add a loyalty points service? Modify order service code.
Event-Driven (async):
func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) error {
// Create order
order := NewOrder(req)
s.repo.Save(order)
// Publish event
event := OrderCreatedEvent{
OrderID: order.ID,
CustomerID: order.CustomerID,
Items: order.Items,
Total: order.Total,
Timestamp: time.Now(),
}
s.publisher.Publish("order.created", event)
return nil // Done! Other services will handle the rest.
}
Benefits:
Loose coupling: Order service doesn't know what happens next
Async: User gets instant response
Resilient: If payment service is down, Kafka buffers events
Extensible: Add new subscribers without touching existing code
Payment service subscribes and reacts:
func (c *PaymentConsumer) HandleOrderCreated(event OrderCreatedEvent) error {
payment, err := c.paymentService.ProcessPayment(event.OrderID, event.Total)
if err != nil {
c.publisher.Publish("order.payment.failed", event.OrderID)
return err
}
c.publisher.Publish("order.payment.completed", payment)
return nil
}
This is the power of events: services react to things that happen, instead of being told what to do.
Setting Up Kafka
Kafka is the most popular event streaming platform. It's fast, distributed, and handles millions of messages per second.
Run Kafka locally with Docker:
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Start it:
docker-compose up -d
Install the Go Kafka client:
go get github.com/IBM/sarama
Publishing Events
Here's a production-ready Kafka producer:
package kafka
import (
"encoding/json"
"github.com/IBM/sarama"
)
type Producer struct {
producer sarama.SyncProducer
}
func NewProducer(brokers []string) (*Producer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all replicas
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
return nil, err
}
return &Producer{producer: producer}, nil
}
func (p *Producer) Publish(topic string, event interface{}) error {
// Serialize event to JSON
eventBytes, err := json.Marshal(event)
if err != nil {
return err
}
// Create message
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(eventBytes),
}
// Send synchronously
_, _, err = p.producer.SendMessage(msg)
return err
}
func (p *Producer) Close() error {
return p.producer.Close()
}
Using it in your service:
type OrderService struct {
repo OrderRepository
publisher *kafka.Producer
}
func (s *OrderService) CreateOrder(ctx context.Context, req CreateOrderRequest) (*Order, error) {
order, err := NewOrder(req.CustomerID, req.Items)
if err != nil {
return nil, err
}
// Save to database
if err := s.repo.Create(ctx, order); err != nil {
return nil, err
}
// Publish event
event := OrderCreatedEvent{
OrderID: order.ID().Hex(),
CustomerID: order.CustomerID(),
Items: req.Items,
Total: order.TotalAmount(),
Timestamp: time.Now(),
}
if err := s.publisher.Publish("order.created", event); err != nil {
// Log error but don't fail the request
// Event will be published by a retry mechanism
logger.Error("failed to publish event", zap.Error(err))
}
return order, nil
}
Event Schemas
Define clear event structures. Other services depend on these:
// order_events.go
type OrderCreatedEvent struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items []OrderItem `json:"items"`
Total float64 `json:"total"`
Timestamp time.Time `json:"timestamp"`
}
type OrderCompletedEvent struct {
OrderID string `json:"order_id"`
Timestamp time.Time `json:"timestamp"`
}
type OrderFailedEvent struct {
OrderID string `json:"order_id"`
Reason string `json:"reason"`
Timestamp time.Time `json:"timestamp"`
}
type OrderItem struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
Best practices:
Use JSON for compatibility (easy to consume from any language)
Include timestamps (for debugging and ordering)
Make events immutable (never change published events)
Version your events (add a
versionfield for breaking changes)
Consuming Events
Here's a consumer that handles order events:
package kafka
import (
"context"
"encoding/json"
"github.com/IBM/sarama"
)
type Consumer struct {
consumer sarama.ConsumerGroup
handler MessageHandler
}
type MessageHandler interface {
Handle(ctx context.Context, topic string, message []byte) error
}
func NewConsumer(brokers []string, groupID string, topics []string, handler MessageHandler) (*Consumer, error) {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
return nil, err
}
c := &Consumer{
consumer: consumer,
handler: handler,
}
go c.consume(context.Background(), topics)
return c, nil
}
func (c *Consumer) consume(ctx context.Context, topics []string) {
for {
err := c.consumer.Consume(ctx, topics, c)
if err != nil {
logger.Error("consumer error", zap.Error(err))
}
}
}
// Setup is called when a new session starts
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { return nil }
func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil }
// ConsumeClaim processes messages
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
ctx := context.Background()
// Handle message
err := c.handler.Handle(ctx, message.Topic, message.Value)
if err != nil {
logger.Error("failed to handle message",
zap.String("topic", message.Topic),
zap.Error(err),
)
// Don't mark as consumed, will retry
continue
}
// Mark message as consumed
session.MarkMessage(message, "")
}
return nil
}
Payment service handler:
type PaymentEventHandler struct {
paymentService *PaymentService
publisher *kafka.Producer
}
func (h *PaymentEventHandler) Handle(ctx context.Context, topic string, message []byte) error {
switch topic {
case "order.created":
var event OrderCreatedEvent
if err := json.Unmarshal(message, &event); err != nil {
return err
}
return h.handleOrderCreated(ctx, event)
default:
return nil // ignore unknown topics
}
}
func (h *PaymentEventHandler) handleOrderCreated(ctx context.Context, event OrderCreatedEvent) error {
logger.Info("processing payment for order", zap.String("order_id", event.OrderID))
payment, err := h.paymentService.ProcessPayment(ctx, event.OrderID, event.Total)
if err != nil {
// Publish failure event
failureEvent := OrderPaymentFailedEvent{
OrderID: event.OrderID,
Reason: err.Error(),
Timestamp: time.Now(),
}
h.publisher.Publish("order.payment.failed", failureEvent)
return err
}
// Publish success event
successEvent := OrderPaymentCompletedEvent{
OrderID: event.OrderID,
PaymentID: payment.ID,
Timestamp: time.Now(),
}
h.publisher.Publish("order.payment.completed", successEvent)
logger.Info("payment processed successfully", zap.String("order_id", event.OrderID))
return nil
}
Error Handling and Retries
What happens when processing fails?
Option 1: Retry with Backoff
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
ctx := context.Background()
// Retry up to 3 times
var err error
for attempt := 0; attempt < 3; attempt++ {
err = c.handler.Handle(ctx, message.Topic, message.Value)
if err == nil {
break // success
}
// Exponential backoff
time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
}
if err != nil {
// After 3 retries, send to dead letter queue
c.sendToDeadLetterQueue(message)
}
// Always mark as consumed (don't block the consumer)
session.MarkMessage(message, "")
}
return nil
}
Option 2: Dead Letter Queue
Send failed messages to a separate topic for manual review:
func (c *Consumer) sendToDeadLetterQueue(message *sarama.ConsumerMessage) {
dlqTopic := message.Topic + ".dlq"
dlqMessage := &sarama.ProducerMessage{
Topic: dlqTopic,
Value: sarama.ByteEncoder(message.Value),
Headers: []sarama.RecordHeader{
{Key: []byte("original-topic"), Value: []byte(message.Topic)},
{Key: []byte("error-time"), Value: []byte(time.Now().Format(time.RFC3339))},
},
}
c.producer.SendMessage(dlqMessage)
logger.Warn("message sent to DLQ", zap.String("topic", dlqTopic))
}
Idempotency: Handling Duplicates
Kafka can deliver messages more than once (at-least-once delivery). Your handlers need to be idempotent.
type PaymentService struct {
repo PaymentRepository
processedIDs *redis.Client // track processed events
}
func (s *PaymentService) ProcessPayment(ctx context.Context, orderID string, amount float64) (*Payment, error) {
// Check if already processed
key := fmt.Sprintf("processed:payment:%s", orderID)
exists, _ := s.processedIDs.Exists(ctx, key).Result()
if exists > 0 {
logger.Info("payment already processed", zap.String("order_id", orderID))
return s.repo.FindByOrderID(ctx, orderID)
}
// Process payment
payment := &Payment{
OrderID: orderID,
Amount: amount,
Status: "completed",
}
if err := s.repo.Create(ctx, payment); err != nil {
return nil, err
}
// Mark as processed (expires in 24 hours)
s.processedIDs.Set(ctx, key, "1", 24*time.Hour)
return payment, nil
}
Event Versioning
As your system evolves, events change. Handle this gracefully:
type OrderCreatedEvent struct {
Version int `json:"version"` // v1, v2, etc.
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Items []OrderItem `json:"items"`
Total float64 `json:"total"`
Timestamp time.Time `json:"timestamp"`
// v2 fields
ShippingAddress *Address `json:"shipping_address,omitempty"`
}
func (h *PaymentEventHandler) handleOrderCreated(ctx context.Context, event OrderCreatedEvent) error {
switch event.Version {
case 1:
return h.handleOrderCreatedV1(ctx, event)
case 2:
return h.handleOrderCreatedV2(ctx, event)
default:
return fmt.Errorf("unsupported event version: %d", event.Version)
}
}
Monitoring and Observability
Add metrics to track event processing:
var (
eventsProcessed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_events_processed_total",
Help: "Total events processed",
},
[]string{"topic", "status"},
)
processingDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "kafka_event_processing_duration_seconds",
Help: "Event processing duration",
},
[]string{"topic"},
)
)
func (h *PaymentEventHandler) Handle(ctx context.Context, topic string, message []byte) error {
start := time.Now()
err := h.actualHandler(ctx, topic, message)
status := "success"
if err != nil {
status = "error"
}
eventsProcessed.WithLabelValues(topic, status).Inc()
processingDuration.WithLabelValues(topic).Observe(time.Since(start).Seconds())
return err
}
Common Pitfalls
Not handling duplicates: Always make handlers idempotent.
Blocking consumers: Don't do slow operations in handlers (delegate to workers).
No dead letter queue: Failed messages disappear forever.
Ignoring ordering: Kafka guarantees order per partition, not across partitions.
Too many consumers: Coordinate consumer groups carefully.
Wrapping Up
Event-driven architecture transforms microservices from tightly coupled spaghetti to loosely coupled, resilient systems.
Start simple:
Pick one flow (e.g., order creation)
Replace direct calls with events
Add error handling and retries
Monitor event processing
Once you see the benefits—resilience, scalability, extensibility—you'll wonder how you ever built systems without events.
Next up: The Saga pattern for handling distributed transactions across services.
Questions? Drop a comment. Always happy to talk about event-driven architecture.
Resources
Thanks for reading! This is part of my microservices series. Follow along for more posts on sagas, caching, and distributed transactions.