当 OpenAI 在 2022 年末让 ChatGPT 以「打字机」效果流式输出回答时,背后的技术并非 WebSocket——而是一个诞生于 2006 年、沉寂多年的 Web 标准:Server-Sent Events(SSE)。如今,从 Claude 到 Gemini,从 Copilot 到 Cursor,几乎所有 AI 产品都在用 SSE 做流式传输。但大多数开发者对 SSE 的理解还停留在 EventSource 三行代码的层面。本文将从协议原理到生产落地,带你彻底搞懂这个被低估的实时通信方案。
🔌 一、SSE 原理与三种实时通信方案对比
1.1 SSE 到底是什么?
SSE 是 HTML5 规范的一部分(WHATWG Streams API),核心机制非常简单:客户端发起一个普通 HTTP 请求,服务端不关闭连接,持续以 text/event-stream 格式推送数据。每条消息由一个或多个 field: value 行组成,用空行分隔。
data: {"token": "你"}
data: {"token": "好"}
data: [DONE]
与 WebSocket 的关键区别在于:SSE 是单向的(服务端→客户端),基于普通 HTTP/1.1 或 HTTP/2,天然支持自动重连、Last-Event-ID 恢复、以及所有现有的 HTTP 基础设施(代理、CDN、负载均衡)。
1.2 三种实时方案深度对比
| 维度 | SSE | WebSocket | Long Polling |
|---|---|---|---|
| 方向 | 单向(服务端→客户端) | 全双工 | 模拟双向 |
| 协议 | HTTP/1.1 或 HTTP/2 | 独立协议(ws://) | HTTP/1.1 |
| 自动重连 | ✅ 内置 | ❌ 需手动实现 | ❌ 需手动实现 |
| 断点续传 | ✅ Last-Event-ID | ❌ 需自行实现 | ❌ 不支持 |
| 二进制数据 | ❌ 仅文本 | ✅ 原生支持 | ❌ 仅文本 |
| 代理/CDN 兼容 | ✅ 完全兼容 | ⚠️ 需特殊配置 | ✅ 完全兼容 |
| HTTP/2 多路复用 | ✅ 共享连接 | ❌ 独立连接 | ✅ 共享连接 |
| 最大并发连接 | 浏览器限制 6/域* | 无限制 | 浏览器限制 6/域 |
| 典型延迟 | < 50ms | < 20ms | 100-500ms |
| 实现复杂度 | ⭐ 低 | ⭐⭐⭐ 高 | ⭐⭐ 中 |
⚠️ **警告:**浏览器对同一域名的 HTTP/1.1 连接数限制为 6 个。SSE 和 Long Polling 都受此限制。但使用 HTTP/2 时,所有请求共享一个 TCP 连接的多路复用流,6 连接限制不再成为瓶颈。
📌 记住:如果你的场景是服务端向客户端推送数据(AI 流式输出、通知推送、实时日志、股票行情),SSE 几乎总是比 WebSocket 更好的选择——更简单、更可靠、更省资源。只有当你需要真正的双向实时通信(聊天室、协同编辑、游戏)时,才应该选 WebSocket。
1.3 为什么 AI 时代 SSE 重新崛起?
三个关键原因:
- 流式 Token 输出天然适配单向推送:LLM 生成文本时,服务端只需不断推送 token,客户端无需发送数据
- HTTP/2 多路复用消除了并发限制:6 连接限制在 HTTP/2 下不再是问题
- 基础设施兼容性:企业网络、反向代理、WAF 对 WebSocket 的支持参差不齐,但 SSE 作为普通 HTTP 请求,走遍天下
⚡ **关键结论:**SSE 不是 WebSocket 的「阉割版」,而是在单向推送场景下更优雅、更工程化的方案。
🚀 二、生产级 SSE 服务端实现
2.1 原生 Node.js 实现
先从零实现一个 SSE 服务端,理解底层协议细节:
// Node.js 原生 SSE 服务端 —— 不依赖任何框架
import http from 'node:http';
const clients = new Set();
const server = http.createServer((req, res) => {
if (req.url === '/events') {
// 设置 SSE 响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache', // 禁止缓存
'Connection': 'keep-alive', // 保持连接
'X-Accel-Buffering': 'no', // 关键!禁用 Nginx 缓冲
'Access-Control-Allow-Origin': '*', // CORS
});
// 发送初始注释,强制连接建立
res.write(':ok\n\n');
// 注册客户端
clients.add(res);
console.log(`客户端已连接,当前在线: ${clients.size}`);
// 心跳:每 30 秒发送一次,防止代理/防火墙超时断开
const heartbeat = setInterval(() => {
res.write(':heartbeat\n\n'); // 注释行不会触发 onmessage
}, 30000);
// 客户端断开时清理
req.on('close', () => {
clients.delete(res);
clearInterval(heartbeat);
console.log(`客户端断开,当前在线: ${clients.size}`);
});
}
});
// 广播函数
function broadcast(event, data) {
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
for (const client of clients) {
client.write(message);
}
}
// 模拟每秒推送一条消息
setInterval(() => {
broadcast('tick', { time: new Date().toISOString(), users: clients.size });
}, 1000);
server.listen(3001, () => console.log('SSE 服务运行在 :3001'));
这段代码有几个生产级要点:X-Accel-Buffering: no 头部防止 Nginx 缓冲响应导致消息延迟;心跳机制防止中间设备因空闲超时而断开连接;Set 数据结构管理活跃客户端,断开时自动清理。
2.2 Nuxt 3 / Nitro 服务端实现
如果你的项目像 jsjson.com 一样使用 Nuxt 3,可以直接在 Server Routes 中实现 SSE。Nitro 底层使用 H3 框架,处理 SSE 需要拿到原始的 Node.js Response 对象:
// server/api/chat-stream.post.ts — Nuxt 3 Server Route 实现 LLM 流式输出
import { defineEventHandler, readBody, setResponseHeaders } from 'h3';
export default defineEventHandler(async (event) => {
const { message } = await readBody(event);
const res = event.node.res;
// 设置 SSE 头部
setResponseHeaders(event, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
// 模拟 LLM 流式输出(替换为真实 API 调用)
const tokens = [
'你好', '!', '我', '是', 'AI', '助手', '。',
'很', '高兴', '为你', '服务', '。',
];
for (const token of tokens) {
res.write(`data: ${JSON.stringify({ token, done: false })}\n\n`);
await new Promise(resolve => setTimeout(resolve, 80)); // 模拟生成延迟
}
res.write(`data: ${JSON.stringify({ token: '', done: true })}\n\n`);
res.end();
});
💡 **提示:**Nuxt 3 的 Server Routes 默认会缓冲响应。确保调用
event.node.res.write()直接写入 Node.js 响应流,而不是返回一个普通对象。
2.3 接入真实 LLM API(OpenAI 兼容接口)
将上面的模拟代码替换为真实的 OpenAI API 调用,实现一个生产级的 AI 流式代理:
// server/api/chat-stream.post.ts — 真实 OpenAI 流式代理
import { defineEventHandler, readBody, setResponseHeaders } from 'h3';
export default defineEventHandler(async (event) => {
const { messages, model = 'gpt-4o' } = await readBody(event);
const res = event.node.res;
setResponseHeaders(event, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no',
});
const response = await fetch('https://api.openai.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
},
body: JSON.stringify({ model, messages, stream: true }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 保留不完整的行
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (data === '[DONE]') {
res.write('data: [DONE]\n\n');
res.end();
return;
}
try {
const parsed = JSON.parse(data);
const token = parsed.choices?.[0]?.delta?.content;
if (token) {
res.write(`data: ${JSON.stringify({ token })}\n\n`);
}
} catch { /* 跳过解析失败的行 */ }
}
}
res.end();
});
这段代码展示了 SSE 的核心优势:逐 chunk 流式传输。服务端从 OpenAI API 读取数据流,解析出每个 token,立即通过 SSE 推送给客户端,整个过程中客户端保持连接不断开。
🛡️ 三、客户端实现与生产级陷阱
3.1 基础客户端:EventSource 的局限
浏览器原生的 EventSource API 虽然简单,但在生产环境中有一个致命缺陷:不支持 POST 请求和自定义请求头。这意味着你无法用它发送请求体或携带 Authorization Token。
// ❌ 错误写法:原生 EventSource 无法 POST 和携带认证头
const es = new EventSource('/api/chat-stream');
es.onmessage = (e) => console.log(e.data);
// 无法发送 { message: "你好" } 请求体
// 无法设置 Authorization: Bearer xxx 头
3.2 生产级客户端:fetch + ReadableStream
正确的做法是用 fetch API 手动解析 SSE 流,这也是所有主流 AI SDK 的实现方式:
// ✅ 正确写法:fetch 手动解析 SSE 流,支持 POST + 自定义头部
async function streamChat(message, onToken, onDone) {
const response = await fetch('/api/chat-stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${getToken()}`, // 支持认证
},
body: JSON.stringify({
messages: [{ role: 'user', content: message }],
}),
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6).trim();
if (data === '[DONE]') {
onDone();
return;
}
try {
const parsed = JSON.parse(data);
onToken(parsed.token);
} catch { /* 忽略解析错误 */ }
}
}
}
onDone();
}
// 使用方式
let result = '';
await streamChat('你好',
(token) => { result += token; updateUI(result); },
() => console.log('流式输出完成')
);
3.3 ⚠️ 生产环境避坑指南
这些是我在生产中踩过的坑,每一个都可能导致诡异的线上问题:
坑点 1:Nginx 代理缓冲
Nginx 默认会缓冲 upstream 响应,导致 SSE 消息不会实时推送到客户端,而是一次性发送所有缓存的数据。
# nginx.conf — 正确的 SSE 代理配置
location /api/chat-stream {
proxy_pass http://127.0.0.1:3000;
proxy_http_version 1.1;
proxy_set_header Connection ''; # 清除 Connection 头
proxy_buffering off; # 关键!禁用代理缓冲
proxy_cache off; # 禁用缓存
chunked_transfer_encoding off; # 禁用分块传输
proxy_read_timeout 300s; # 5 分钟超时,适配长对话
}
坑点 2:客户端 AbortController 未清理
用户切换页面或关闭对话时,如果不主动中止 fetch 请求,连接会一直占用资源。使用 AbortController 可以优雅地取消流式传输:
// 客户端中止控制器 —— 防止连接泄漏
let currentController = null;
async function streamChatWithAbort(message, onToken) {
// 中止上一个未完成的请求
if (currentController) currentController.abort();
currentController = new AbortController();
try {
const response = await fetch('/api/chat-stream', {
method: 'POST',
signal: currentController.signal, // 传入中止信号
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
// ... 解析流的逻辑同上
} catch (err) {
if (err.name === 'AbortError') {
console.log('用户主动取消');
return;
}
throw err;
}
}
// 页面卸载时清理
window.addEventListener('beforeunload', () => {
if (currentController) currentController.abort();
});
坑点 3:服务端连接数耗尽
每个 SSE 连接都是一个长时间存活的 HTTP 连接。如果不限制最大连接数,恶意用户可以轻松耗尽你的服务器资源。必须设置连接上限:
// 服务端连接数限制 —— 防止资源耗尽
const MAX_CLIENTS = 1000;
const clients = new Map(); // key: clientId, value: response
function handleSSE(req, res) {
if (clients.size >= MAX_CLIENTS) {
res.writeHead(429, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: '服务器繁忙,请稍后重试' }));
return;
}
const clientId = crypto.randomUUID();
clients.set(clientId, res);
// 设置单连接超时:5 分钟无活动自动断开
const timeout = setTimeout(() => {
res.write('event: timeout\ndata: {"reason":"idle timeout"}\n\n');
res.end();
clients.delete(clientId);
}, 300_000);
req.on('data', () => clearTimeout(timeout)); // 有数据活动时重置定时器
req.on('close', () => {
clearTimeout(timeout);
clients.delete(clientId);
});
}
⚠️ **警告:**Node.js 默认最大 socket 数约为几千个。如果 SSE 连接数超过 1000,需要调整
server.maxConnections和操作系统级的文件描述符限制(ulimit -n)。高并发场景建议使用专门的消息队列(如 Redis Pub/Sub)做多实例广播。
📊 四、完整实战:构建 SSE 连接管理器
将上述所有最佳实践封装成一个可复用的连接管理器,同时处理重连、心跳、认证和背压:
// sse-client.js — 生产级 SSE 客户端封装
class SSEClient {
#controller = null;
#retryCount = 0;
#maxRetries = 5;
#baseDelay = 1000;
constructor(url, options = {}) {
this.url = url;
this.headers = options.headers || {};
this.onMessage = options.onMessage || (() => {});
this.onError = options.onError || console.error;
this.onOpen = options.onOpen || (() => {});
this.maxRetries = options.maxRetries ?? 5;
}
async connect(body = {}) {
this.#controller = new AbortController();
try {
const response = await fetch(this.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...this.headers,
},
body: JSON.stringify(body),
signal: this.#controller.signal,
});
if (!response.ok) {
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After') || 5;
this.#scheduleRetry(body, retryAfter * 1000);
return;
}
throw new Error(`SSE 连接失败: HTTP ${response.status}`);
}
this.#retryCount = 0; // 连接成功,重置重试计数
this.onOpen();
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const parts = buffer.split('\n\n');
buffer = parts.pop() || '';
for (const part of parts) {
this.#parseMessage(part);
}
}
} catch (err) {
if (err.name === 'AbortError') return;
this.#scheduleRetry(body);
}
}
#parseMessage(raw) {
let event = 'message';
let data = '';
for (const line of raw.split('\n')) {
if (line.startsWith('event: ')) event = line.slice(7);
if (line.startsWith('data: ')) data += line.slice(6);
}
if (data === '[DONE]') return;
try {
this.onMessage({ event, data: JSON.parse(data) });
} catch {
this.onMessage({ event, data });
}
}
#scheduleRetry(body, delay) {
if (this.#retryCount >= this.#maxRetries) {
this.onError(new Error('超过最大重试次数'));
return;
}
const waitTime = delay || this.#baseDelay * Math.pow(2, this.#retryCount);
this.#retryCount++;
console.log(`SSE 将在 ${waitTime}ms 后重连 (第 ${this.#retryCount} 次)`);
setTimeout(() => this.connect(body), waitTime);
}
close() {
this.#controller?.abort();
}
}
// 使用示例
const client = new SSEClient('/api/chat-stream', {
headers: { Authorization: 'Bearer xxx' },
onMessage: ({ event, data }) => {
if (event === 'token') appendToUI(data.token);
},
onError: (err) => showToast(err.message),
});
client.connect({ message: '你好' });
这个封装处理了生产环境中最关键的几个问题:指数退避重连(避免网络抖动时疯狂重试)、429 限流响应处理、连接中止与资源清理、以及二进制 buffer 拼接导致的消息边界问题。
💡 **提示:**指数退避(Exponential Backoff)的计算公式是
baseDelay × 2^retryCount,即第 1 次等 1 秒,第 2 次等 2 秒,第 3 次等 4 秒。这避免了大量客户端同时重连导致的「雷群效应」(Thundering Herd)。
🔑 五、总结与技术选型建议
SSE 在 AI 时代焕发出新的生命力,但它并非银弹。以下是明确的选型决策框架:
✅ **选择 SSE 的场景:**AI/LLM 流式输出、实时通知推送、日志流、股票行情、进度更新、服务端事件广播
❌ **不选 SSE 的场景:**双向实时通信(聊天室)、高频交互(游戏)、二进制数据传输(音视频)、需要低延迟双向推送(协同编辑)
💰 **成本优势:**SSE 基于普通 HTTP 连接,无需独立的 WebSocket 网关,CDN 和负载均衡器零配置支持。对于大多数推送场景,SSE 的基础设施成本比 WebSocket 低 30-50%。
相关工具推荐:
- 🔧 EventSource polyfill:
eventsource-polyfill— 兼容旧浏览器和 Node.js - 🔧 H3/Nitro:Nuxt 3 的服务端框架,原生支持 Node.js Response 流操作
- 🔧 PartyKit:基于 Cloudflare Workers 的实时应用框架,内置 SSE 支持
- 🔧 Mercure:基于 SSE 的实时推送协议,自带 Hub 和授权机制
- 🔧 json-formatter:开发调试 SSE 时,用 jsjson.com/json-format 格式化
data字段中的 JSON 数据,提升调试效率
SSE 的美在于它的简单——一个 HTTP 连接,一种文本格式,一套标准 API。在过度工程化的今天,选择最简单的可行方案,往往才是最好的工程决策。