When we needed to build a real-time data platform for a live event timing company, the requirements were clear: events arrive from timing hardware, and subscribers — broadcast systems, mobile apps, live displays — need to see that data immediately. Not in 5 seconds. Not after a polling interval. Instantly.
This post walks through how we designed the real-time streaming layer using FastAPI's native WebSocket support and Redis Pub/Sub as the broadcast backbone.
The Problem
Our platform ingests events from timing hardware via HTTP POST. Multiple clients need to receive those events in real-time via WebSocket connections. The challenge:
- Fan-out — One incoming event might need to go to dozens of connected subscribers.
- Scalability — We can't assume all WebSocket connections live on the same server instance.
- Authorization — Subscribers should only receive events for channels they're permitted to see.
- Reliability — Dead connections shouldn't block broadcasts to healthy ones.
A naive approach — keeping a list of WebSocket connections in memory and iterating through them — breaks down the moment you scale beyond a single server. You need a message broker.
Why Redis Pub/Sub?
We considered several options:
| Option | Pros | Cons |
|---|---|---|
| In-memory broadcast | Simple, fast | Single-server only, no horizontal scaling |
| PostgreSQL LISTEN/NOTIFY | Already have Postgres | Limited throughput, not designed for high fan-out |
| RabbitMQ / Kafka | Battle-tested, durable | Operational overhead, overkill for our use case |
| Redis Pub/Sub | Fast, simple, already using Redis | No persistence (fine for real-time) |
Redis was already in our stack for rate limiting and API key validation caching. Adding Pub/Sub meant zero additional infrastructure. And for real-time streaming where we don't need message persistence (if you miss it, it's gone), Redis Pub/Sub is ideal.
The Architecture
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Device │────▶│ POST /v1/ │────▶│ PostgreSQL │
│ (Publisher)│ │ events │ │ (storage) │
└─────────────┘ └──────┬──────┘ └─────────────┘
│
│ PUBLISH
▼
┌─────────────┐
│ Redis │
│ Pub/Sub │
└──────┬──────┘
│
┌─────────────┼─────────────┐
│ SUBSCRIBE │ SUBSCRIBE │ SUBSCRIBE
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ WebSocket │ │ WebSocket │ │ WebSocket │
│ Client 1 │ │ Client 2 │ │ Client 3 │
└───────────┘ └───────────┘ └───────────┘
When an event is ingested:
- Store it in PostgreSQL (durability, historical queries)
- Publish it to Redis on a channel-specific topic
- All server instances subscribed to that topic receive the message
- Each instance broadcasts to its connected WebSocket clients
Implementation
We built a WebSocketManager class that handles connection lifecycle and message distribution:
class WebSocketManager:
"""Manages WebSocket connections and broadcasts."""
def __init__(self, redis: Redis) -> None:
self._redis = redis
self._connections: dict[str, set[WebSocket]] = defaultdict(set)
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket, channels: list[str]) -> None:
"""Register a WebSocket for specific channels."""
await websocket.accept()
async with self._lock:
for channel in channels:
self._connections[channel].add(websocket)
async def disconnect(self, websocket: WebSocket) -> None:
"""Remove a WebSocket from all channels."""
async with self._lock:
for channel_connections in self._connections.values():
channel_connections.discard(websocket)
async def broadcast(self, channel: str, message: dict) -> None:
"""Send a message to all connections subscribed to a channel."""
async with self._lock:
connections = self._connections.get(channel, set()).copy()
# Broadcast outside the lock to avoid blocking
dead_connections = []
for websocket in connections:
try:
await websocket.send_json(message)
except Exception:
dead_connections.append(websocket)
# Clean up dead connections
if dead_connections:
async with self._lock:
for ws in dead_connections:
for channel_connections in self._connections.values():
channel_connections.discard(ws)
Key decisions:
- Lock for connection management — Adding/removing connections must be thread-safe. We use an
asyncio.Lockto prevent race conditions. - Copy before broadcast — We copy the connection set before iterating, so we're not holding the lock during potentially slow network operations.
- Silent failure handling — If a send fails, we mark the connection as dead and clean it up. No exceptions bubble up to disrupt other broadcasts.
The Redis Subscriber
A background task listens to Redis and triggers broadcasts:
async def redis_subscriber(manager: WebSocketManager, redis: Redis) -> None:
"""Listen to Redis Pub/Sub and broadcast to WebSocket clients."""
pubsub = redis.pubsub()
await pubsub.psubscribe("events:*") # Subscribe to all event channels
async for message in pubsub.listen():
if message["type"] != "pmessage":
continue
# Extract channel name from "events:channel-name"
channel = message["channel"].decode().split(":", 1)[1]
data = json.loads(message["data"])
await manager.broadcast(channel, data)
We use psubscribe (pattern subscribe) with events:* so we don't need to know all channels upfront. Any new channel automatically gets picked up.
The WebSocket Endpoint
@router.websocket("/ws/stream")
async def websocket_stream(
websocket: WebSocket,
token: str = Query(...),
channels: list[str] = Query(default=[]),
) -> None:
"""Real-time event stream via WebSocket."""
# Validate API key and get permissions
principal = await auth_service.validate_key(token)
if not principal or principal.role not in (Role.ADMIN, Role.SUBSCRIBER):
await websocket.close(code=4003)
return
# Determine which channels this client can access
if principal.role == Role.ADMIN:
allowed_channels = channels or ["*"] # Admin sees all
else:
# Intersection of requested and permitted channels
permitted = set(principal.channels)
requested = set(channels) if channels else permitted
allowed_channels = list(permitted & requested)
if not allowed_channels:
await websocket.close(code=4003)
return
# Connect and wait for disconnect
await manager.connect(websocket, allowed_channels)
try:
while True:
# Keep connection alive, handle pings
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
await manager.disconnect(websocket)
The authorization logic ensures:
- Only
adminandsubscriberroles can connect - Subscribers only receive events for channels they're permitted to see
- If a subscriber requests specific channels, we intersect with their permissions
Publishing Events
When an event is ingested, we publish to Redis after storing:
async def ingest_event(event: EventCreate, principal: Principal) -> EventResponse:
"""Ingest an event and broadcast to subscribers."""
# Store in database
stored_event = await event_repo.upsert(event)
# Broadcast via Redis
await redis.publish(
f"events:{event.channel}",
stored_event.model_dump_json()
)
return stored_event
Why This Scales
This architecture scales horizontally:
- Multiple API instances — Each instance subscribes to Redis. When an event is published, all instances receive it and broadcast to their local WebSocket connections.
- No session affinity required — A client can connect to any instance. Redis ensures they receive events regardless of which instance ingested them.
- Redis handles fan-out — We don't implement our own message routing. Redis Pub/Sub does the heavy lifting.
- Graceful degradation — If a WebSocket connection dies mid-broadcast, we clean it up without affecting others.
Trade-offs
This approach has trade-offs worth understanding:
No persistence — Redis Pub/Sub is fire-and-forget. If a subscriber disconnects and reconnects, they miss events that happened in between. For our use case (live event data), this is acceptable. For other use cases, you might need Redis Streams or a proper message queue.
Memory pressure — Every connected WebSocket holds memory. At massive scale (tens of thousands of connections), you'd want to monitor memory and potentially implement connection limits.
Single Redis instance — For this deployment, one Redis instance is fine. At larger scale, you'd consider Redis Cluster or a managed Redis service.
Results
The system handles our client's requirements comfortably:
- Events arrive at subscribers within milliseconds of ingestion
- Hundreds of simultaneous WebSocket connections work without issue
- Adding new channels requires no code changes
- The same codebase works in development (single instance) and production (multiple instances)
Key Takeaways
- Use Redis Pub/Sub for real-time fan-out when you don't need message persistence. It's simple, fast, and probably already in your stack.
- Separate connection management from broadcasting — Hold locks briefly for bookkeeping, but release them before doing I/O.
- Handle dead connections gracefully — Network failures happen. Clean up silently without disrupting healthy clients.
- Intersect permissions server-side — Never trust the client to filter their own data. Enforce authorization at the WebSocket layer.
- Design for horizontal scaling from day one — Even if you start with one instance, using a message broker means you can scale out without rewriting.
This post is part of a series on building PulseRelay, a real-time data platform for live event timing systems. See also: Secure API Key Management with HMAC-SHA256 and HTMX + Jinja2: Admin UIs Without JavaScript Frameworks.