🧠 AI System Design

Day 19: Streaming & SSE

📂 Serving & Inference 📖 15 min read Needs expansion

Learning Objectives

  • Understand Server-Sent Events (SSE) and WebSocket for streaming
  • Learn how to fan out one model stream to multiple clients
  • Build a streaming proxy

Theory (15 min)

Why Stream?

Without streaming, the client waits for the entire response:

Client: "Write a 500-word essay"
        ──────────────────────────────────────▶ Wait 15 seconds ──▶ Full essay at once

With streaming, the client sees tokens as they're generated:

Client: "Write a 500-word essay"
        ──▶ "Here"──▶ " is"──▶ " your"──▶ " essay"──▶ ...
        ↑ TTFT: 0.5s           ↑ User reads as it arrives

Time-to-First-Token (TTFT) becomes the perceived latency — and it's usually <1s even for long outputs.

SSE vs WebSocket

Feature SSE (Server-Sent Events) WebSocket
Direction Server → Client only Bidirectional
Protocol HTTP (simple) WS (upgrade)
Reconnection Automatic (EventSource API) Manual
Binary data No Yes
Head-of-line blocking No (chunked) No
Complexity Very low Medium

For LLM streaming, SSE is the standard — OpenAI, Anthropic, and open-source all use it.

Fan-Out Problem

One model stream ──▶ Multiple clients

Each client wants the same output. But the model produces one token stream. Solutions:

  1. Broadcast: Write tokens to a message queue, all subscribers get them
  2. Buffered replay: Buffer entire output, serve from buffer to late joiners
  3. Per-client stream: One model call per client (expensive, but simplest)

Hands-on (15 min)

Build a Streaming SSE Proxy

#!/usr/bin/env python3
"""streaming-proxy.py — SSE fan-out from one model stream to many clients."""
import asyncio
import json
import httpx
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading

# Stub — Ayva will expand with:
# - Real SSE protocol (text/event-stream content-type)
# - WebSocket support (websockets library)
# - Fan-out: one model call, N subscribers
# - Late-joiner buffer (catch-up for clients that connect mid-stream)
# - Backpressure handling (slow client doesn't slow model)
# - Token-level timing metrics
# - Integration with the AI Gateway from Day 7

LLM_URL = "http://localhost:8080/v1/completions"

class StreamHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        content_len = int(self.headers.get("Content-Length", 0))
        body = json.loads(self.rfile.read(content_len))

        self.send_response(200)
        self.send_header("Content-Type", "text/event-stream")
        self.send_header("Cache-Control", "no-cache")
        self.send_header("Connection", "keep-alive")
        self.end_headers()

        # Simulated streaming (replace with real SSE from llama.cpp)
        prompt = body.get("prompt", "")
        words = [
            "Here", " is", " your", " streaming", " response.",
            " Each", " word", " arrives", " one", " at", " a", " time.",
        ]

        import time
        for word in words:
            msg = json.dumps({"choices": [{"text": word, "finish_reason": None}]})
            self.wfile.write(f"data: {msg}\n\n".encode())
            self.wfile.flush()
            time.sleep(0.15)

        # Done signal
        done = json.dumps({"choices": [{"text": "", "finish_reason": "stop"}]})
        self.wfile.write(f"data: {done}\n\n".encode())
        self.wfile.flush()

    def log_message(self, format, *args):
        pass


# Real streaming client example
async def stream_from_llama():
    """Read a streaming response from llama.cpp."""
    async with httpx.AsyncClient(timeout=60) as client:
        async with client.stream(
            "POST", LLM_URL,
            json={"prompt": "Write a haiku about AI.", "max_tokens": 50, "stream": True},
        ) as resp:
            print("Streaming response:")
            async for line in resp.aiter_lines():
                if line.startswith("data: "):
                    data = line[6:]
                    if data == "[DONE]":
                        break
                    try:
                        payload = json.loads(data)
                        text = payload["choices"][0].get("text", "")
                        print(text, end="", flush=True)
                    except json.JSONDecodeError:
                        pass
            print()


if __name__ == "__main__":
    print("Starting streaming proxy on :9010")
    server = HTTPServer(("0.0.0.0", 9010), StreamHandler)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()

    # Also test real streaming
    asyncio.run(stream_from_llama())

Test it:

# 1. Start the proxy
python3 /tmp/streaming-proxy.py &

# 2. Test with curl (shows tokens as they arrive)
curl -N http://localhost:9010/v1/completions \
  -H "Content-Type: application/json" \
  -d '{"prompt":"Hello","max_tokens":20}'

Questions for Ayva: - How does llama.cpp's --stream flag work internally? - What's the best fan-out architecture for high-concurrency streaming? - How to handle client disconnection mid-stream without leaking resources?


Key Takeaways

  • Streaming improves perceived latency dramatically (TTFT vs total time)
  • SSE is the standard protocol for LLM streaming (simple, HTTP-native)
  • Fan-out requires a pub/sub bridge between one model stream and many clients
  • Always clean up streams on client disconnect to avoid resource leaks

References