几乎所有主流 LLM 服务——ChatGPT、Claude、Gemini——的流式输出都基于同一个协议:Server-Sent Events(SSE)。这个诞生于 2004 年、被写入 HTML5 规范的协议,在 AI 时代迎来了第二春。然而多数开发者对 SSE 的理解仍停留在"比 WebSocket 简单"的层面,对它的断线重连机制、HTTP/2 多路复用优势、以及在高并发场景下的陷阱知之甚少。本文将从协议原理到生产实战,彻底讲透 SSE。
🔌 一、SSE 协议原理与核心机制
SSE 的本质是 HTTP 单向流:客户端发起普通 HTTP 请求,服务端不返回完整响应,而是持续发送 text/event-stream 格式的数据块,直到连接关闭。浏览器原生提供 EventSource API 来消费这种流。
📡 1.1 协议格式详解
SSE 的数据格式极其简洁,每条事件由以下字段组成:
id: 42
event: message
retry: 5000
data: {"content": "Hello, World!"}
data: {"continued": true}
id:事件 ID,断线重连时通过Last-Event-Id头告知服务端event:事件类型,默认为messageretry:建议客户端的重连间隔(毫秒)data:事件数据,支持多行(每行以data:开头)- 空行(
\n\n):标志一条事件结束
💡 提示:
data字段支持多行,但每行都必须以data:前缀。这是初学者最容易犯的错误——直接写多行文本会导致只有第一行被接收。
下面是一个原生 Node.js SSE 服务端的最小实现:
// server.js — 最小 SSE 服务端(Node.js 原生 HTTP)
import http from 'node:http';
const server = http.createServer((req, res) => {
if (req.url !== '/events') {
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end('<h1>SSE Server Running</h1>');
return;
}
// SSE 响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
});
let id = 0;
const interval = setInterval(() => {
id++;
const data = JSON.stringify({ time: new Date().toISOString(), id });
// 严格遵循 SSE 格式:id + data + 空行
res.write(`id: ${id}\n`);
res.write(`data: ${data}\n\n`);
}, 1000);
// 客户端断开时清理
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
server.listen(3000, () => console.log('SSE on http://localhost:3000'));
对应的客户端消费代码:
<!-- client.html — EventSource 客户端 -->
<script>
const es = new EventSource('/events');
// 监听默认 message 事件
es.onmessage = (e) => {
const data = JSON.parse(e.data);
console.log(`[${data.id}] ${data.time}`);
};
// 浏览器自动处理断线重连
es.onerror = (e) => {
console.warn('连接异常,浏览器将自动重连...');
};
// 自定义事件类型
es.addEventListener('alert', (e) => {
console.log('收到告警:', e.data);
});
</script>
🔄 1.2 断线重连:SSE 的杀手级特性
这是 SSE 相比 WebSocket 最大的优势之一。EventSource 内置了**自动重连 + 事件溯源(Event Sourcing)**机制:
- 连接断开后,浏览器自动按
retry字段指定的间隔重连 - 重连请求自动携带
Last-Event-Id头 - 服务端根据该 ID 返回断线期间的所有事件,不丢数据
// 服务端:利用 Last-Event-Id 实现断线续传
const server = http.createServer((req, res) => {
if (req.url !== '/events') return;
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// 从客户端断线位置恢复
const lastId = parseInt(req.headers['last-event-id'] || '0', 10);
let currentId = lastId;
// 先补发断线期间的事件
for (let i = lastId + 1; i <= getLatestId(); i++) {
const data = getEventById(i); // 从缓存/数据库获取
res.write(`id: ${i}\ndata: ${JSON.stringify(data)}\n\n`);
}
// 继续发送新事件
const interval = setInterval(() => {
currentId++;
const data = { id: currentId, content: 'new event' };
res.write(`id: ${currentId}\ndata: ${JSON.stringify(data)}\n\n`);
}, 1000);
req.on('close', () => clearInterval(interval));
});
⚠️ **警告:**如果你不设置
id字段,断线重连时客户端无法告知服务端"上次收到哪条",重连后会从头开始接收,导致重复数据。在生产环境中,每条事件必须有唯一且递增的 id。
⚖️ 二、SSE vs WebSocket 深度对比
"该用 SSE 还是 WebSocket?“是我被问得最多的架构选型问题之一。答案取决于你的场景,而不是哪个"更先进”。
📊 2.1 全维度对比
| 维度 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 单向(服务端→客户端) | 全双工 |
| 协议 | HTTP/1.1 或 HTTP/2 | ws:// 或 wss:// |
| 自动重连 | ✅ 浏览器原生支持 | ❌ 需手动实现 |
| 事件溯源 | ✅ Last-Event-Id | ❌ 需手动实现 |
| 负载均衡 | ✅ 标准 HTTP,LB 无感知 | ⚠️ 需要支持 WebSocket 的 LB |
| 代理/CDN 兼容 | ✅ 完美兼容 | ⚠️ 部分代理不支持 Upgrade |
| 二进制数据 | ❌ 仅文本 | ✅ 支持二进制帧 |
| 跨域 | ✅ 标准 CORS | ✅ 自带 Origin 检查 |
| 连接数限制 | 6/域名(HTTP/1.1),HTTP/2 无限制 | 无同源限制 |
| 实现复杂度 | ⭐ 极低 | ⭐⭐⭐ 中等 |
🎯 2.2 选型决策树
我的经验法则:
选 SSE 的场景:
- ✅ LLM/AI 流式输出(token by token)
- ✅ 实时通知、消息推送
- ✅ 实时数据看板(股票行情、监控指标)
- ✅ 进度条更新(文件上传、任务执行)
- ✅ 需要经过企业代理/防火墙的场景
选 WebSocket 的场景:
- ✅ 在线聊天(双向实时)
- ✅ 协同编辑(如 Google Docs)
- ✅ 实时游戏(低延迟双向通信)
- ✅ 需要传输二进制数据(音视频流)
📌 **记住:**如果你只需要服务端向客户端推送数据,SSE 几乎总是更好的选择。它更简单、更可靠、更符合 HTTP 生态。不要因为 WebSocket "更酷"就盲目选择。
🚀 2.3 HTTP/2 下的 SSE 优势
在 HTTP/1.1 下,SSE 有一个广为人知的限制:每个域名最多 6 个并发连接(浏览器限制)。但在 HTTP/2 下,这个问题完全消失——所有请求共享一个 TCP 连接,通过流(Stream)多路复用。
# Nginx 配置:启用 HTTP/2 + SSE
server {
listen 443 ssl http2;
server_name api.example.com;
ssl_certificate /path/to/cert.pem;
ssl_certificate_key /path/to/key.pem;
location /events {
proxy_pass http://backend:3000;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off; # 关键!禁用缓冲
proxy_cache off; # 禁用缓存
chunked_transfer_encoding off; # 禁用分块编码
proxy_read_timeout 86400s; # 长连接超时 24 小时
}
}
⚠️ **警告:**Nginx 默认会缓冲响应(proxy_buffering),这会导致 SSE 事件延迟到达客户端。必须设置
proxy_buffering off。这是 SSE 部署中最常见的坑。
🛠️ 三、多语言生产级实现
🟢 3.1 Node.js + Express:LLM 流式输出
这是最贴近实际开发的场景——对接 OpenAI API,将 LLM 的流式输出转发给前端:
// llm-stream.js — LLM 流式输出代理(Node.js + Express)
import express from 'express';
const app = express();
app.use(express.json());
app.post('/api/chat', async (req, res) => {
const { message } = req.body;
// SSE 响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
try {
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({
model: 'gpt-4o',
messages: [{ role: 'user', content: message }],
stream: true, // 开启流式输出
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留不完整的行
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') {
res.write('data: [DONE]\n\n');
break;
}
try {
const parsed = JSON.parse(data);
const token = parsed.choices?.[0]?.delta?.content;
if (token) {
// 转发给前端客户端
res.write(`data: ${JSON.stringify({ token })}\n\n`);
}
} catch {
// 忽略解析错误
}
}
}
} catch (error) {
res.write(`event: error\ndata: ${JSON.stringify({ message: error.message })}\n\n`);
} finally {
res.end();
}
});
app.listen(3000);
前端消费:
// chat-client.js — 前端 LLM 流式消费
async function sendMessage(message) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let fullText = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (data === '[DONE]') return fullText;
try {
const { token } = JSON.parse(data);
fullText += token;
// 实时更新 UI
document.getElementById('output').textContent = fullText;
} catch {}
}
}
return fullText;
}
💡 **提示:**注意上面的前端代码没有使用
EventSource,而是用fetch+ReadableStream。原因是EventSource只支持 GET 请求,而 LLM API 通常需要 POST 请求体。这是 SSE 实战中最容易踩的坑之一。
🔵 3.2 Go:高并发 SSE 服务
Go 的 goroutine 天然适合 SSE 这种长连接场景。下面是一个支持多客户端广播的 SSE 服务:
// main.go — Go SSE 广播服务
package main
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// Event 结构体
type Event struct {
ID int `json:"id"`
Data string `json:"data"`
}
// SSEBroker 管理所有客户端连接
type Broker struct {
clients map[chan Event]bool
mu sync.RWMutex
nextID int
}
func NewBroker() *Broker {
return &Broker{clients: make(map[chan Event]bool)}
}
func (b *Broker) Subscribe() chan Event {
ch := make(chan Event, 64) // 带缓冲,防止慢消费者阻塞
b.mu.Lock()
b.clients[ch] = true
b.mu.Unlock()
return ch
}
func (b *Broker) Unsubscribe(ch chan Event) {
b.mu.Lock()
delete(b.clients, ch)
b.mu.Unlock()
close(ch)
}
func (b *Broker) Broadcast(data string) {
b.mu.RLock()
defer b.mu.RUnlock()
b.nextID++
event := Event{ID: b.nextID, Data: data}
for ch := range b.clients {
select {
case ch <- event:
default:
// 慢消费者,丢弃事件而非阻塞
fmt.Printf("client %p is slow, dropping event %d\n", ch, event.ID)
}
}
}
func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// SSE 响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
ch := b.Subscribe()
defer b.Unsubscribe(ch)
// 从 Last-Event-Id 恢复
lastID := 0
if id := r.Header.Get("Last-Event-Id"); id != "" {
fmt.Sscanf(id, "%d", &lastID)
}
ctx := r.Context()
for {
select {
case event := <-ch:
if event.ID <= lastID {
continue // 跳过已接收的事件
}
payload, _ := json.Marshal(event.Data)
fmt.Fprintf(w, "id: %d\ndata: %s\n\n", event.ID, payload)
flusher.Flush()
case <-ctx.Done():
return
}
}
}
func main() {
broker := NewBroker()
// 每 2 秒广播一次
go func() {
for {
time.Sleep(2 * time.Second)
msg := fmt.Sprintf("server time: %s", time.Now().Format(time.RFC3339))
broker.Broadcast(msg)
}
}()
http.Handle("/events", broker)
http.ListenAndServe(":8080", nil)
}
☕ 3.3 Spring Boot:企业级 SSE 端点
// SseController.java — Spring Boot SSE 端点
package com.example.sse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
@RestController
@RequestMapping("/api")
public class SseController {
private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe() {
// 超时设为 0 表示不超时(生产环境建议设为合理值)
SseEmitter emitter = new SseEmitter(0L);
emitters.add(emitter);
emitter.onCompletion(() -> emitters.remove(emitter));
emitter.onTimeout(() -> emitters.remove(emitter));
emitter.onError(e -> emitters.remove(emitter));
// 发送初始连接确认
try {
emitter.send(SseEmitter.event()
.name("connected")
.data("OK")
.id(String.valueOf(System.currentTimeMillis())));
} catch (IOException e) {
emitters.remove(emitter);
}
return emitter;
}
@PostMapping("/broadcast")
public void broadcast(@RequestBody String message) {
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event()
.name("message")
.data(message)
.id(String.valueOf(System.currentTimeMillis())));
} catch (IOException e) {
emitters.remove(emitter);
}
}
}
}
⚠️ **警告:**Spring 的
SseEmitter默认超时是 30 秒。如果不设置new SseEmitter(0L),连接会在 30 秒后自动断开。此外,CopyOnWriteArrayList在客户端数量多(>1000)时性能下降,生产环境应考虑使用 Redis Pub/Sub 做广播。
🏗️ 四、生产环境避坑指南
⚠️ 4.1 六大常见陷阱
经过在多个项目中踩坑总结,以下是 SSE 生产部署最容易出问题的地方:
| 陷阱 | 症状 | 解决方案 |
|---|---|---|
| Nginx 代理缓冲 | 事件延迟 30 秒到达 | proxy_buffering off |
| 不设 ID 字段 | 断线重连丢数据或重复 | 每条事件必须有递增 id |
| EventSource 只支持 GET | 无法发送请求体 | 改用 fetch + ReadableStream |
| HTTP/1.1 连接数限制 | 高并发下新连接被拒 | 升级 HTTP/2 |
| 慢消费者阻塞 | 服务端内存溢出 | 设置缓冲队列,丢弃慢消费者 |
| 心跳缺失 | 代理/LB 因空闲断开连接 | 每 15-30 秒发送 : ping\n\n |
💓 4.2 心跳保活实现
// heartbeat.js — SSE 心跳保活
function setupSSE(req, res) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// 每 15 秒发送心跳(注释行,不会触发事件)
const heartbeat = setInterval(() => {
res.write(': ping\n\n'); // 以冒号开头的是注释,客户端忽略
}, 15000);
req.on('close', () => {
clearInterval(heartbeat);
});
}
📌 **记住:**SSE 注释行以冒号(
:)开头,客户端的EventSource会忽略它,但 TCP 连接因此保持活跃。这是防止中间代理(Nginx、CloudFlare、AWS ALB)因空闲超时而断开连接的标准做法。
🔐 4.3 认证方案
SSE 不支持自定义请求头(EventSource 的 API 限制),认证方案有三种:
// 方案一:Cookie 认证(同域推荐)
const es = new EventSource('/events', { withCredentials: true });
// 方案二:URL 参数传递 Token(简单但有日志泄露风险)
const es = new EventSource(`/events?token=${accessToken}`);
// 方案三:先 POST 获取短生命周期 ticket,再用 ticket 建立 SSE
async function connectSSE() {
// 1. POST 获取一次性 ticket
const { ticket } = await fetch('/api/sse-ticket', {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}` },
}).then(r => r.json());
// 2. 用 ticket 建立 SSE 连接
const es = new EventSource(`/events?ticket=${ticket}`);
return es;
}
⚡ **关键结论:**方案三(ticket 模式)是生产环境最安全的选择。Token 只在 POST 请求中传输(HTTPS 加密),不会出现在 URL 日志、浏览器历史或代理日志中。ticket 是一次性的,即使泄露也无法重放。
📝 总结
SSE 不是 WebSocket 的"低配版",而是一个为单向推送场景精心设计的协议。它的自动重连、事件溯源、HTTP 原生兼容性,在很多场景下比 WebSocket 更可靠、更简单。
我的建议:
- 新项目的 LLM 流式输出:首选 SSE + fetch ReadableStream
- 实时通知/看板:首选 SSE,除非需要双向通信
- 聊天/协同编辑:用 WebSocket
- 不确定时:从 SSE 开始,它更容易迁移和调试
相关工具推荐:
- EventSource polyfill — fetch-based SSE 客户端,支持 POST 请求
- Hono — 轻量 Web 框架,内置 SSE 中间件
- Mercure — 基于 SSE 的实时推送协议,支持 Go/PHP/Node.js
- jsjson.com 在线工具 — JSON 格式化、Base64 编解码 助力 SSE 数据调试