Python PHP

Building Real-Time WebSocket Streaming with Redis Pub/Sub and FastAPI

Feb 23, 2026 3 min read

How we architected a scalable event broadcast system that delivers data to subscribers the instant it arrives.

When we needed to build a real-time data platform for a live event timing company, the requirements were clear: events arrive from timing hardware, and subscribers — broadcast systems, mobile apps, live displays — need to see that data immediately. Not in 5 seconds. Not after a polling interval. Instantly.

This post walks through how we designed the real-time streaming layer using FastAPI's native WebSocket support and Redis Pub/Sub as the broadcast backbone.

The Problem

Our platform ingests events from timing hardware via HTTP POST. Multiple clients need to receive those events in real-time via WebSocket connections. The challenge:

  1. Fan-out — One incoming event might need to go to dozens of connected subscribers.
  2. Scalability — We can't assume all WebSocket connections live on the same server instance.
  3. Authorization — Subscribers should only receive events for channels they're permitted to see.
  4. Reliability — Dead connections shouldn't block broadcasts to healthy ones.

A naive approach — keeping a list of WebSocket connections in memory and iterating through them — breaks down the moment you scale beyond a single server. You need a message broker.

Why Redis Pub/Sub?

We considered several options:

Option Pros Cons
In-memory broadcast Simple, fast Single-server only, no horizontal scaling
PostgreSQL LISTEN/NOTIFY Already have Postgres Limited throughput, not designed for high fan-out
RabbitMQ / Kafka Battle-tested, durable Operational overhead, overkill for our use case
Redis Pub/Sub Fast, simple, already using Redis No persistence (fine for real-time)

Redis was already in our stack for rate limiting and API key validation caching. Adding Pub/Sub meant zero additional infrastructure. And for real-time streaming where we don't need message persistence (if you miss it, it's gone), Redis Pub/Sub is ideal.

The Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Device    │────▶│  POST /v1/  │────▶│  PostgreSQL │
│  (Publisher)│     │   events    │     │  (storage)  │
└─────────────┘     └──────┬──────┘     └─────────────┘
                          │
                          │ PUBLISH
                          ▼
                   ┌─────────────┐
                   │    Redis    │
                   │   Pub/Sub   │
                   └──────┬──────┘
                          │
            ┌─────────────┼─────────────┐
            │ SUBSCRIBE   │ SUBSCRIBE   │ SUBSCRIBE
            ▼             ▼             ▼
     ┌───────────┐ ┌───────────┐ ┌───────────┐
     │ WebSocket │ │ WebSocket │ │ WebSocket │
     │  Client 1 │ │  Client 2 │ │  Client 3 │
     └───────────┘ └───────────┘ └───────────┘

When an event is ingested:

  1. Store it in PostgreSQL (durability, historical queries)
  2. Publish it to Redis on a channel-specific topic
  3. All server instances subscribed to that topic receive the message
  4. Each instance broadcasts to its connected WebSocket clients

Implementation

We built a WebSocketManager class that handles connection lifecycle and message distribution:

class WebSocketManager:
    """Manages WebSocket connections and broadcasts."""
    
    def __init__(self, redis: Redis) -> None:
        self._redis = redis
        self._connections: dict[str, set[WebSocket]] = defaultdict(set)
        self._lock = asyncio.Lock()
    
    async def connect(self, websocket: WebSocket, channels: list[str]) -> None:
        """Register a WebSocket for specific channels."""
        await websocket.accept()
        async with self._lock:
            for channel in channels:
                self._connections[channel].add(websocket)
    
    async def disconnect(self, websocket: WebSocket) -> None:
        """Remove a WebSocket from all channels."""
        async with self._lock:
            for channel_connections in self._connections.values():
                channel_connections.discard(websocket)
    
    async def broadcast(self, channel: str, message: dict) -> None:
        """Send a message to all connections subscribed to a channel."""
        async with self._lock:
            connections = self._connections.get(channel, set()).copy()
        
        # Broadcast outside the lock to avoid blocking
        dead_connections = []
        for websocket in connections:
            try:
                await websocket.send_json(message)
            except Exception:
                dead_connections.append(websocket)
        
        # Clean up dead connections
        if dead_connections:
            async with self._lock:
                for ws in dead_connections:
                    for channel_connections in self._connections.values():
                        channel_connections.discard(ws)

Key decisions:

  • Lock for connection management — Adding/removing connections must be thread-safe. We use an asyncio.Lock to prevent race conditions.
  • Copy before broadcast — We copy the connection set before iterating, so we're not holding the lock during potentially slow network operations.
  • Silent failure handling — If a send fails, we mark the connection as dead and clean it up. No exceptions bubble up to disrupt other broadcasts.
The Redis Subscriber

A background task listens to Redis and triggers broadcasts:

async def redis_subscriber(manager: WebSocketManager, redis: Redis) -> None:
    """Listen to Redis Pub/Sub and broadcast to WebSocket clients."""
    pubsub = redis.pubsub()
    await pubsub.psubscribe("events:*")  # Subscribe to all event channels
    
    async for message in pubsub.listen():
        if message["type"] != "pmessage":
            continue
        
        # Extract channel name from "events:channel-name"
        channel = message["channel"].decode().split(":", 1)[1]
        data = json.loads(message["data"])
        
        await manager.broadcast(channel, data)

We use psubscribe (pattern subscribe) with events:* so we don't need to know all channels upfront. Any new channel automatically gets picked up.

The WebSocket Endpoint
@router.websocket("/ws/stream")
async def websocket_stream(
    websocket: WebSocket,
    token: str = Query(...),
    channels: list[str] = Query(default=[]),
) -> None:
    """Real-time event stream via WebSocket."""
    # Validate API key and get permissions
    principal = await auth_service.validate_key(token)
    if not principal or principal.role not in (Role.ADMIN, Role.SUBSCRIBER):
        await websocket.close(code=4003)
        return
    
    # Determine which channels this client can access
    if principal.role == Role.ADMIN:
        allowed_channels = channels or ["*"]  # Admin sees all
    else:
        # Intersection of requested and permitted channels
        permitted = set(principal.channels)
        requested = set(channels) if channels else permitted
        allowed_channels = list(permitted & requested)
    
    if not allowed_channels:
        await websocket.close(code=4003)
        return
    
    # Connect and wait for disconnect
    await manager.connect(websocket, allowed_channels)
    try:
        while True:
            # Keep connection alive, handle pings
            await websocket.receive_text()
    except WebSocketDisconnect:
        pass
    finally:
        await manager.disconnect(websocket)

The authorization logic ensures:

  • Only admin and subscriber roles can connect
  • Subscribers only receive events for channels they're permitted to see
  • If a subscriber requests specific channels, we intersect with their permissions
Publishing Events

When an event is ingested, we publish to Redis after storing:

async def ingest_event(event: EventCreate, principal: Principal) -> EventResponse:
    """Ingest an event and broadcast to subscribers."""
    # Store in database
    stored_event = await event_repo.upsert(event)
    
    # Broadcast via Redis
    await redis.publish(
        f"events:{event.channel}",
        stored_event.model_dump_json()
    )
    
    return stored_event

Why This Scales

This architecture scales horizontally:

  1. Multiple API instances — Each instance subscribes to Redis. When an event is published, all instances receive it and broadcast to their local WebSocket connections.
  2. No session affinity required — A client can connect to any instance. Redis ensures they receive events regardless of which instance ingested them.
  3. Redis handles fan-out — We don't implement our own message routing. Redis Pub/Sub does the heavy lifting.
  4. Graceful degradation — If a WebSocket connection dies mid-broadcast, we clean it up without affecting others.

Trade-offs

This approach has trade-offs worth understanding:

No persistence — Redis Pub/Sub is fire-and-forget. If a subscriber disconnects and reconnects, they miss events that happened in between. For our use case (live event data), this is acceptable. For other use cases, you might need Redis Streams or a proper message queue.

Memory pressure — Every connected WebSocket holds memory. At massive scale (tens of thousands of connections), you'd want to monitor memory and potentially implement connection limits.

Single Redis instance — For this deployment, one Redis instance is fine. At larger scale, you'd consider Redis Cluster or a managed Redis service.

Results

The system handles our client's requirements comfortably:

  • Events arrive at subscribers within milliseconds of ingestion
  • Hundreds of simultaneous WebSocket connections work without issue
  • Adding new channels requires no code changes
  • The same codebase works in development (single instance) and production (multiple instances)

Key Takeaways

  1. Use Redis Pub/Sub for real-time fan-out when you don't need message persistence. It's simple, fast, and probably already in your stack.
  2. Separate connection management from broadcasting — Hold locks briefly for bookkeeping, but release them before doing I/O.
  3. Handle dead connections gracefully — Network failures happen. Clean up silently without disrupting healthy clients.
  4. Intersect permissions server-side — Never trust the client to filter their own data. Enforce authorization at the WebSocket layer.
  5. Design for horizontal scaling from day one — Even if you start with one instance, using a message broker means you can scale out without rewriting.

This post is part of a series on building PulseRelay, a real-time data platform for live event timing systems. See also: Secure API Key Management with HMAC-SHA256 and HTMX + Jinja2: Admin UIs Without JavaScript Frameworks.