Event-Driven Systems in Go with Redis: Pub/Sub and Streams

Hi, I’m Oluwatosin Thompson, a Backend Engineer specialising in Go (Golang) and passionate about creating reliable, scalable, and efficient backend systems that power great products.
I’ve built and maintained production-grade services using Temporal, Prometheus, and AWS, with a strong emphasis on performance optimisation, distributed systems, and clean, maintainable code. My work reflects a balance between technical excellence and real-world business impact.
I enjoy sharing my learning journey through technical writing and documenting how I solve engineering challenges. Known for being teachable, adaptable, and detail-oriented, I’m always eager to take on complex problems and deliver elegant solutions.
I’m committed to engineering growth, collaboration, and continuous improvement, with a long-term goal of building impactful systems used worldwide.
TL;DR
Use Redis Pub/Sub for ephemeral fan-out (broadcast to online consumers; no persistence).
Use Redis Streams + Consumer Groups for durable, at-least-once processing, retries, backpressure, and scaling.
Model Domain Events in your domain (DDD), publish them in the application layer, and keep Redis-specific details in the infrastructure layer.
Implement idempotency, retries, dead-lettering, and graceful shutdown.
Introduction
The first time I dabbled with Redis Pub/Sub was about two years ago - also my very first attempt at building an Event-Driven Design (EDD) system. It was an experiment, full of “aha” moments. You can still find that original implementation in this repository, and looking back now, I can see how far I’ve come.😀
Since then, I’ve had the chance to work with multiple message brokers, gain a better understanding of event-driven architectures, and refine my approach, though I’m still learning every day.
This article is a kind of follow-up: a better approach to using Redis Pub/Sub and Streams in a Domain-Driven Design (DDD) setting. It’s not just theory, we’ll go hands-on with a full implementation that includes idempotency, retries, dead-lettering, and graceful shutdown.
Event-Driven Design (EDD) is about building systems that react to events instead of making direct calls. This creates loosely coupled services that communicate via messages, enabling scalability, resilience, and flexibility.
Redis is often thought of as an in-memory key-value store, but it is also a powerful message broker with support for Pub/Sub and Streams. Combined with Go’s concurrency features, Redis becomes an excellent foundation for building real-time, event-driven architectures.
1. Event-Driven Design in a Nutshell
Instead of calling a function directly, in EDD we emit an event that other services can react to.
Example flow:
User Service emits
UserRegisteredevent after new user signup.Email Service listens for
UserRegisteredand sends a welcome email.Analytics Service listens for
UserRegisteredand logs stats.
Services don’t know about each other; they only know about events.
2. Redis as the Event Backbone
Redis offers two primary patterns for event-driven communication:
Pub/Sub - Fire-and-forget messaging. No persistence. Consumers must be online to get the message.
Streams - Persistent log of events with delivery guarantees and replay support.
3. When to Choose Pub/Sub vs Streams
| Feature | Pub/Sub | Streams |
| Persistence | No persistence; messages lost if offline | Messages are stored until trimmed |
| Delivery Guarantee | At most once | At least once (can implement exactly once) |
| Backpressure | No control | Consumer groups process at their own pace |
| Replay | Not possible | Possible - read old messages |
| Use cases | Live chat, notifications, in-memory events | Event sourcing, task queues, audit logs |
| Commands | PUBLISH, SUBSCRIBE | XADD, XREADGROUP, XACK |
4. Go DDD Folder Structure for EDD
Here’s how we can structure an event-driven system with Redis in Go while following Domain-Driven Design principles.
/user-service
├── cmd/
│ └── main.go # Application entrypoint
├── internal/
│ ├── domain/ # Core business logic (pure Go, no Redis)
│ │ ├── entity/
│ │ │ └── user.go # User entity
│ │ ├── event/
│ │ │ └── events.go # Domain events (UserRegistered, etc.)
│ │ ├── valueobject/
│ │ │ └── email.go # Email value object
│ │ └── service.go # Domain services
│ ├── application/ # Application layer
│ │ └── user_service.go # Orchestrates domain logic & events
│ ├── infrastructure/ # Frameworks, DB, Redis, etc.
│ │ ├── redis/
│ │ │ ├── pubsub.go # Pub/Sub publisher/subscriber
│ │ │ ├── streams.go # Streams producer/consumer
│ │ └── repository.go # Persistence (DB, cache)
│ └── interfaces/ # Controllers, CLI, HTTP handlers
│ └── http/
│ └── user_handler.go
└── go.mod
5. Implementing Pub/Sub in Go with Redis
Pub/Sub is great when:
All consumers are online
Messages don’t need to be stored
You need real-time push
pubsub.go:
package redis
import (
"context"
"log"
"github.com/redis/go-redis/v9"
)
const (
// PubSubChannel is the channel name for our Pub/Sub events.
PubSubChannel = "user-events"
)
// PubSubPublisher publishes events to a Redis Pub/Sub channel.
type PubSubPublisher struct {
client *redis.Client
}
// NewPubSubPublisher creates a new PubSubPublisher.
func NewPubSubPublisher(client *redis.Client) *PubSubPublisher {
return &PubSubPublisher{client: client}
}
// Publish sends a message to the specified channel.
func (p *PubSubPublisher) Publish(ctx context.Context, data []byte) error {
return p.client.Publish(ctx, PubSubChannel, data).Err()
}
// PubSubSubscriber listens for events on a Redis Pub/Sub channel.
type PubSubSubscriber struct {
client *redis.Client
}
// NewPubSubSubscriber creates a new PubSubSubscriber.
func NewPubSubSubscriber(client *redis.Client) *PubSubSubscriber {
return &PubSubSubscriber{client: client}
}
// Subscribe listens for messages and processes them.
func (s *PubSubSubscriber) Subscribe(ctx context.Context, handler func(context.Context, []byte) error) {
pubsub := s.client.Subscribe(ctx, PubSubChannel)
defer pubsub.Close()
ch := pubsub.Channel()
for {
select {
case msg := <-ch:
err := handler(ctx, []byte(msg.Payload))
if err != nil {
log.Printf("Error processing Pub/Sub message: %v", err)
}
case <-ctx.Done():
log.Println("Pub/Sub subscriber shutting down.")
return
}
}
}
6. Implementing Streams in Go with Redis
Streams are great when:
Consumers may be offline
You need message persistence
You need multiple consumers with their offsets
streams.go:
package redis
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/redis/go-redis/v9"
)
const (
// StreamName is the name of our Redis Stream.
StreamName = "user-stream"
// ConsumerGroup is the name of the consumer group.
ConsumerGroup = "user-service-group"
// DLQStreamName is the name of the Dead-Letter Queue stream.
DLQStreamName = "user-dlq-stream"
// MaxRetries is the maximum number of retries before moving to the DLQ.
MaxRetries = 5
)
// StreamProducer produces events to a Redis Stream.
type StreamProducer struct {
client *redis.Client
}
// NewStreamProducer creates a new StreamProducer.
func NewStreamProducer(client *redis.Client) *StreamProducer {
return &StreamProducer{client: client}
}
// Produce adds a message to the stream.
func (p *StreamProducer) Produce(ctx context.Context, eventID string, data []byte) error {
message := map[string]interface{}{
"eventID": eventID,
"payload": data,
}
return p.client.XAdd(ctx, &redis.XAddArgs{
Stream: StreamName,
Values: message,
}).Err()
}
// StreamConsumer consumes events from a Redis Stream.
type StreamConsumer struct {
client *redis.Client
consumerName string
processedKeys *redis.Client // Separate client for idempotency
}
// NewStreamConsumer creates a new StreamConsumer.
func NewStreamConsumer(client *redis.Client, consumerName string) (*StreamConsumer, error) {
// Ensure the consumer group exists
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := client.XGroupCreateMkStream(ctx, StreamName, ConsumerGroup, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
return nil, fmt.Errorf("failed to create consumer group: %w", err)
}
// Create a separate Redis client for idempotency to avoid blocking the main connection.
processedKeysClient := redis.NewClient(&redis.Options{
Addr: client.Options().Addr,
})
return &StreamConsumer{
client: client,
consumerName: consumerName,
processedKeys: processedKeysClient,
}, nil
}
// GracefulShutdown handles stopping the consumer and cleaning up.
func (c *StreamConsumer) GracefulShutdown(ctx context.Context, handler func(context.Context, []byte) error) {
log.Printf("Starting Stream consumer [%s]", c.consumerName)
// Channel to listen for OS signals.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Context for graceful shutdown.
ctx, cancel := context.WithCancel(ctx)
go func() {
sig := <-sigChan
log.Printf("Received signal: %v. Shutting down gracefully...", sig)
cancel()
}()
c.Consume(ctx, handler)
}
// Consume starts consuming messages from the stream.
func (c *StreamConsumer) Consume(ctx context.Context, handler func(context.Context, []byte) error) {
defer func() {
if err := c.processedKeys.Close(); err != nil {
log.Printf("Error closing processedKeys client: %v", err)
}
}()
for {
select {
case <-ctx.Done():
log.Println("Stream consumer shutting down.")
return
default:
// Read from the stream. '0' means read from the beginning.
// '>' means read new messages only.
// We use '0' to read pending messages first.
streams, err := c.client.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: ConsumerGroup,
Consumer: c.consumerName,
Streams: []string{StreamName, ">"}, // Read pending and new messages
Count: 1,
Block: 1 * time.Second, // Block for 1 second if no new messages
}).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
continue // No new messages, continue the loop.
}
log.Printf("Error reading from stream: %v", err)
time.Sleep(5 * time.Second) // Wait before retrying.
continue
}
for _, stream := range streams {
for _, message := range stream.Messages {
c.processMessage(ctx, message, handler)
}
}
}
}
}
// processMessage handles a single stream message with idempotency, retries, and DLQ.
func (c *StreamConsumer) processMessage(ctx context.Context, message redis.XMessage, handler func(context.Context, []byte) error) {
eventID, ok := message.Values["eventID"].(string)
if !ok || eventID == "" {
log.Printf("Skipping message with invalid eventID: %s", message.ID)
c.ackMessage(ctx, message.ID)
return
}
// --- Idempotency Check ---
isProcessed, err := c.isMessageProcessed(ctx, eventID)
if err != nil {
log.Printf("Error checking idempotency for event %s: %v", eventID, err)
// We can't guarantee idempotency, but we'll try to process it anyway.
}
if isProcessed {
log.Printf("Event %s already processed. Acknowledging message %s.", eventID, message.ID)
c.ackMessage(ctx, message.ID)
return
}
// --- Retry Logic ---
var lastErr error
for i := 0; i < MaxRetries; i++ {
payload, ok := message.Values["payload"].(string)
if !ok {
log.Printf("Invalid payload in message %s", message.ID)
c.ackMessage(ctx, message.ID)
return
}
err = handler(ctx, []byte(payload))
if err == nil {
// Success! Acknowledge and mark as processed.
c.ackMessage(ctx, message.ID)
c.markMessageAsProcessed(ctx, eventID)
log.Printf("Successfully processed message %s (event %s)", message.ID, eventID)
return
}
lastErr = err
log.Printf("Attempt %d failed for message %s: %v. Retrying in %d seconds...", i+1, message.ID, err, (i+1)*2)
time.Sleep(time.Duration(i+1) * 2 * time.Second) // Exponential backoff.
}
// --- Dead-Lettering ---
log.Printf("Message %s (event %s) failed after %d retries - %v. Moving to DLQ.", message.ID, eventID, MaxRetries, lastErr)
c.moveToDLQ(ctx, message.Values)
c.ackMessage(ctx, message.ID)
}
// ackMessage acknowledges a message to remove it from the pending entries list.
func (c *StreamConsumer) ackMessage(ctx context.Context, messageID string) {
err := c.client.XAck(ctx, StreamName, ConsumerGroup, messageID).Err()
if err != nil {
log.Printf("Error acknowledging message %s: %v", messageID, err)
}
}
// isMessageProcessed checks if an event ID has been processed before.
func (c *StreamConsumer) isMessageProcessed(ctx context.Context, eventID string) (bool, error) {
// We'll use a Redis Set to store processed event IDs.
// We also set a TTL to prevent the set from growing indefinitely.
key := "processed_events"
isMember, err := c.processedKeys.SIsMember(ctx, key, eventID).Result()
if err != nil {
return false, err
}
return isMember, nil
}
// markMessageAsProcessed: the TTL is set on the processed_events set to ensure that old event IDs are removed after 24 hours. This prevents the set from growing indefinitely, which would consume unnecessary memory and potentially degrade performance. Expiring keys is a common strategy for managing temporary or time-sensitive data in Redis.
// markMessageAsProcessed adds an event ID to the set of processed events.
func (c *StreamConsumer) markMessageAsProcessed(ctx context.Context, eventID string) {
key := "processed_events"
// Add the event ID and set a TTL of 24 hours.
c.processedKeys.SAdd(ctx, key, eventID)
c.processedKeys.Expire(ctx, key, 24*time.Hour)
}
// moveToDLQ moves a failed message to the Dead-Letter Queue stream.
func (c *StreamConsumer) moveToDLQ(ctx context.Context, values map[string]interface{}) {
values["original_stream"] = StreamName
values["failure_timestamp"] = time.Now().Format(time.RFC3339)
err := c.client.XAdd(ctx, &redis.XAddArgs{
Stream: DLQStreamName,
Values: values,
}).Err()
if err != nil {
log.Printf("Failed to move message to DLQ: %v", err)
}
}
7. Complete Implementation
I have added comments to help you understand the entire codebase. Follow the project structure and the README.md file to learn how to navigate and run this project. Visit the GitHub repo to see the complete implementation.
8. Final Thoughts
Pub/Sub:
Pros: Low latency, simple, real-time
Cons: No persistence, no replay
Use: Notifications, live dashboards, ephemeral events
Streams:
Pros: Persistent, replayable, backpressure control
Cons: More complex, storage overhead
Use: Event sourcing, queues, guaranteed delivery
By combining DDD for clear separation of concerns with Redis for efficient event distribution, you can build scalable, resilient Go systems that are easy to evolve.





