2025 年至今,几乎所有的 AI 大模型 API(OpenAI、Claude、Gemini、国产大模型)都采用 Server-Sent Events(SSE)作为流式输出的默认协议。据统计,超过 90% 的 LLM API 调用使用流式模式,而 SSE 正是这背后的传输基石。如果你还在用轮询或长连接来处理实时数据,那这篇文章会让你彻底改变认知——SSE 的简洁性和可靠性远超大多数开发者的想象。
🔍 一、SSE 核心原理与技术细节
Server-Sent Events 是 W3C 标准(HTML5 规范的一部分),基于 HTTP/1.1 的长连接,实现服务器到客户端的单向实时推送。与 WebSocket 不同,SSE 天然支持自动重连和事件 ID 追踪,这在生产环境中是巨大的优势。
📋 协议格式详解
SSE 的数据格式非常简洁,基于纯文本的 text/event-stream MIME 类型。每条消息由一个或多个字段组成,字段之间用 \n 分隔,消息之间用 \n\n 分隔。
// SSE 消息格式
data: 第一行数据\n
data: 第二行数据\n
event: message-type\n
id: 12345\n
retry: 5000\n
\n
四个核心字段的含义:
| 字段 | 作用 | 是否必须 | 说明 |
|---|---|---|---|
data |
消息内容 | ✅ 必须 | 多行用多个 data:,客户端用 \n 拼接 |
event |
事件类型 | ❌ 可选 | 默认为 message,客户端按类型监听 |
id |
事件 ID | ❌ 可选 | 断线重连时通过 Last-Event-ID 请求头回传 |
retry |
重连间隔 | ❌ 可选 | 单位为毫秒,客户端自动采用 |
📋 完整的服务端实现(Node.js)
以下是一个生产级的 SSE 服务端实现,包含心跳保活、客户端管理和错误处理:
// Node.js 原生实现 SSE 服务端(无需框架依赖)
const http = require('http');
class SSEManager {
constructor() {
// 存储所有活跃的客户端连接
this.clients = new Map();
this.heartbeatInterval = 30000; // 30 秒心跳
this._startHeartbeat();
}
// 处理新的 SSE 连接
handleConnection(req, res) {
// 设置 SSE 必需的响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache', // 禁止缓存
'Connection': 'keep-alive', // 保持连接
'X-Accel-Buffering': 'no', // 禁用 Nginx 缓冲
'Access-Control-Allow-Origin': '*', // CORS
});
// 从 URL 或 Header 中提取客户端 ID
const clientId = req.headers['x-client-id'] || crypto.randomUUID();
// 注册客户端
this.clients.set(clientId, { res, connectedAt: Date.now() });
console.log(`客户端 ${clientId} 已连接,当前在线: ${this.clients.size}`);
// 支持断线重连:读取 Last-Event-ID
const lastEventId = req.headers['last-event-id'];
if (lastEventId) {
console.log(`客户端 ${clientId} 从事件 ${lastEventId} 恢复`);
// 这里可以重放 lastEventId 之后的事件
}
// 发送初始连接确认
this.send(clientId, { type: 'connected', clientId });
// 监听客户端断开
req.on('close', () => {
this.clients.delete(clientId);
console.log(`客户端 ${clientId} 已断开,当前在线: ${this.clients.size}`);
});
req.on('error', () => {
this.clients.delete(clientId);
});
}
// 向指定客户端发送事件
send(clientId, data, eventType = 'message', eventId = null) {
const client = this.clients.get(clientId);
if (!client) return false;
let message = '';
if (eventId) message += `id: ${eventId}\n`;
if (eventType !== 'message') message += `event: ${eventType}\n`;
// 支持多行 data
const dataStr = typeof data === 'string' ? data : JSON.stringify(data);
dataStr.split('\n').forEach(line => {
message += `data: ${line}\n`;
});
message += '\n'; // 消息结尾的空行
try {
client.res.write(message);
return true;
} catch (err) {
this.clients.delete(clientId);
return false;
}
}
// 广播给所有客户端
broadcast(data, eventType = 'message') {
let sent = 0;
for (const [clientId] of this.clients) {
if (this.send(clientId, data, eventType)) sent++;
}
return sent;
}
// 心跳保活,防止代理/防火墙断开空闲连接
_startHeartbeat() {
setInterval(() => {
for (const [clientId, client] of this.clients) {
try {
client.res.write(': heartbeat\n\n'); // 注释行作为心跳
} catch (err) {
this.clients.delete(clientId);
}
}
}, this.heartbeatInterval);
}
}
// 启动服务
const sse = new SSEManager();
const server = http.createServer((req, res) => {
if (req.url === '/events') {
sse.handleConnection(req, res);
} else if (req.url === '/push' && req.method === 'POST') {
// 模拟推送消息
let body = '';
req.on('data', chunk => body += chunk);
req.on('end', () => {
const count = sse.broadcast(JSON.parse(body));
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ sent: count }));
});
} else {
res.writeHead(404);
res.end('Not Found');
}
});
server.listen(3001, () => console.log('SSE 服务运行在 :3001'));
📌 **记住:**SSE 的响应头中
Cache-Control: no-cache和X-Accel-Buffering: no缺一不可,否则 Nginx、CDN 等中间层会缓存数据,导致客户端收不到实时消息。
📋 前端客户端实现
浏览器原生的 EventSource API 虽然简洁,但有两个致命缺陷:不支持自定义请求头(无法携带 Authorization)和只能使用 GET 方法。下面分别展示原生用法和生产级封装:
// ❌ 原生 EventSource 的局限性
const source = new EventSource('/events');
source.onmessage = (e) => console.log('收到:', e.data);
source.onerror = (e) => console.error('连接错误');
// ❌ 问题:无法携带 Authorization 头,无法 POST body
// 对于需要认证的 API 场景完全不够用
// ✅ 生产级 SSE 客户端(支持认证、重连、超时)
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
headers: options.headers || {},
retryInterval: options.retryInterval || 3000,
maxRetries: options.maxRetries || 10,
timeout: options.timeout || 120000, // 2 分钟超时
body: options.body || null,
method: options.method || 'GET',
};
this.controller = null;
this.retryCount = 0;
this.listeners = new Map();
this.lastEventId = null;
}
// 用 fetch 实现 SSE(支持自定义 header 和 POST)
async connect() {
this.controller = new AbortController();
const headers = {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
...this.options.headers,
};
// 断线重连时传递 Last-Event-ID
if (this.lastEventId) {
headers['Last-Event-ID'] = this.lastEventId;
}
try {
const response = await fetch(this.url, {
method: this.options.method,
headers,
body: this.options.body ? JSON.stringify(this.options.body) : undefined,
signal: this.controller.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
this.retryCount = 0; // 连接成功,重置重试计数
await this._readStream(response.body);
} catch (err) {
if (err.name === 'AbortError') return; // 主动断开
console.error(`SSE 连接失败: ${err.message}`);
this._scheduleReconnect();
}
}
// 逐行解析 SSE 流
async _readStream(body) {
const reader = body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let eventType = 'message';
let eventId = null;
let eventData = '';
// 超时检测
let timeoutId = this._resetTimeout();
while (true) {
const { done, value } = await reader.read();
if (done) break;
clearTimeout(timeoutId);
timeoutId = this._resetTimeout();
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留未完成的行
for (const line of lines) {
if (line.startsWith('event:')) {
eventType = line.slice(6).trim();
} else if (line.startsWith('id:')) {
eventId = line.slice(3).trim();
this.lastEventId = eventId;
} else if (line.startsWith('data:')) {
eventData += (eventData ? '\n' : '') + line.slice(5).trim();
} else if (line === '') {
// 空行 = 消息结束,触发事件
if (eventData) {
this._emit(eventType, {
data: eventData,
id: eventId,
type: eventType,
});
}
eventType = 'message';
eventId = null;
eventData = '';
}
// 注释行(: 开头)是心跳,忽略
}
}
clearTimeout(timeoutId);
}
_resetTimeout() {
return setTimeout(() => {
console.warn('SSE 超时,断开重连');
this.disconnect();
this._scheduleReconnect();
}, this.options.timeout);
}
_scheduleReconnect() {
if (this.retryCount >= this.options.maxRetries) {
this._emit('fatal', { data: '达到最大重试次数' });
return;
}
this.retryCount++;
const delay = this.options.retryInterval * Math.min(this.retryCount, 5);
console.log(`第 ${this.retryCount} 次重连,${delay}ms 后...`);
setTimeout(() => this.connect(), delay);
}
on(event, callback) {
if (!this.listeners.has(event)) this.listeners.set(event, []);
this.listeners.get(event).push(callback);
return this;
}
_emit(event, data) {
const callbacks = this.listeners.get(event) || [];
callbacks.forEach(cb => cb(data));
// 同时触发通配符
const wildcard = this.listeners.get('*') || [];
wildcard.forEach(cb => cb({ ...data, event }));
}
disconnect() {
if (this.controller) this.controller.abort();
}
}
// 使用示例:连接需要认证的 AI 流式 API
const sse = new SSEClient('https://api.example.com/chat/stream', {
headers: { 'Authorization': 'Bearer sk-xxxx' },
method: 'POST',
body: { model: 'gpt-4', messages: [{ role: 'user', content: '你好' }] },
retryInterval: 2000,
maxRetries: 5,
});
sse.on('message', (e) => {
const chunk = JSON.parse(e.data);
document.getElementById('output').textContent += chunk.choices[0]?.delta?.content || '';
});
sse.on('done', () => {
console.log('流式输出完成');
sse.disconnect();
});
sse.on('fatal', () => alert('连接不可用,请刷新页面'));
sse.connect();
🤖 二、SSE 在 AI 流式场景中的深度实践
当前几乎所有主流 LLM API 都使用 SSE 作为流式传输协议。理解 SSE 在 AI 场景中的特殊处理方式,是每个现代后端开发者的基本功。
📋 AI 流式协议的统一规范
不同厂商的 AI API 在 SSE 事件格式上高度趋同,都遵循 OpenAI 首创的 chat.completion.chunk 模式:
// 统一的 AI 流式消费端:兼容 OpenAI / Claude / 国产大模型
async function streamAIChat(apiUrl, apiKey, messages) {
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`,
'Accept': 'text/event-stream',
},
body: JSON.stringify({
model: 'deepseek-chat',
messages,
stream: true, // 关键参数:启用流式
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
const trimmed = line.trim();
// 跳过空行和非 data 行
if (!trimmed || !trimmed.startsWith('data:')) continue;
const dataStr = trimmed.slice(5).trim();
// 流结束标记
if (dataStr === '[DONE]') {
return fullContent;
}
try {
const parsed = JSON.parse(dataStr);
const delta = parsed.choices?.[0]?.delta?.content || '';
fullContent += delta;
// 实时渲染到页面
yield { content: delta, done: false };
} catch (e) {
// 忽略无法解析的行(可能是心跳或格式错误)
console.warn('SSE 解析跳过:', dataStr);
}
}
}
return fullContent;
}
// 在 Vue/React 中使用
async function handleChat() {
const stream = streamAIChat(
'https://api.deepseek.com/v1/chat/completions',
'sk-your-key',
[{ role: 'user', content: '用 JavaScript 实现一个 LRU 缓存' }]
);
for await (const chunk of stream) {
// 逐 token 渲染到 UI
outputElement.textContent += chunk.content;
}
}
⚠️ **警告:**AI 流式 API 中,
[DONE]标记是 OpenAI 规范的约定,但不是所有厂商都遵循。Claude API 使用message_stop事件类型,国产大模型各有差异。务必针对每个 API 文档做兼容处理。
📋 性能对比:SSE vs WebSocket vs 长轮询
在 AI 流式场景中选择传输协议,直接影响用户体验和服务器成本:
| 维度 | SSE | WebSocket | 长轮询 |
|---|---|---|---|
| 连接建立开销 | HTTP 1 次握手 | HTTP + 升级握手 | 每次 HTTP 握手 |
| 消息延迟 | 50-100ms | 10-50ms | 500ms-2s |
| 服务器内存/连接 | 低(单向) | 中(双向) | 高(反复创建) |
| 自动重连 | ✅ 内置 | ❌ 需手动实现 | ✅ 天然支持 |
| 代理/CDN 兼容 | ✅ 完美(标准 HTTP) | ⚠️ 需配置 | ✅ 完美 |
| 浏览器并发限制 | ⚠️ 6 个域名限制 | 无限制 | 无限制 |
| 适合场景 | 服务器→客户端单向推送 | 双向实时通信 | 兼容性兜底方案 |
⚡ **关键结论:**对于 AI 流式输出这种「服务器单向推送」的场景,SSE 是最优解。它基于标准 HTTP,不需要特殊的代理配置,天然支持 CDN 和负载均衡,且代码复杂度远低于 WebSocket。
📋 后端代理模式:统一转发 LLM 流
实际项目中,前端不会直连 LLM API,而是通过自己的后端做代理。下面是一个 Express + SSE 的完整代理实现:
// 后端代理 LLM 流式响应,同时做 Token 计量和日志
const express = require('express');
const app = express();
app.use(express.json());
app.post('/api/chat/stream', async (req, res) => {
const { messages, model = 'deepseek-chat' } = req.body;
const userId = req.user?.id; // 从 JWT 中获取
// 设置 SSE 响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
let totalTokens = 0;
let fullResponse = '';
try {
const llmResponse = await fetch('https://api.deepseek.com/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.LLM_API_KEY}`,
},
body: JSON.stringify({ model, messages, stream: true }),
});
const reader = llmResponse.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
// 客户端断开时清理上游连接
let aborted = false;
req.on('close', () => { aborted = true; reader.cancel(); });
while (!aborted) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed.startsWith('data:')) continue;
// 直接透传 LLM 的 SSE 数据给前端
res.write(trimmed + '\n\n');
// 同时解析内容做计量
const dataStr = trimmed.slice(5).trim();
if (dataStr !== '[DONE]') {
try {
const parsed = JSON.parse(dataStr);
const content = parsed.choices?.[0]?.delta?.content || '';
fullResponse += content;
totalTokens += parsed.usage?.total_tokens || 0;
} catch (e) { /* 心跳等非 JSON 行 */ }
}
}
}
// 流结束后记录日志和用量
console.log(`用户 ${userId} | 模型 ${model} | tokens: ${totalTokens}`);
await saveUsageLog(userId, model, totalTokens, fullResponse);
} catch (err) {
console.error('LLM 代理错误:', err);
res.write(`event: error\ndata: ${JSON.stringify({ error: '服务暂时不可用' })}\n\n`);
}
res.end();
});
💡 **提示:**上游 LLM API 超时通常为 120 秒。如果用户在流式过程中断开连接(
req.on('close')),务必立即取消上游的 reader,避免浪费 API 额度。
🛡️ 三、生产环境避坑指南
📋 坑点 1:Nginx 缓冲导致消息延迟
这是 SSE 部署中最常见也最隐蔽的问题。Nginx 默认会缓冲 upstream 响应,导致所有 SSE 消息被攒到一起发送,客户端体验为「卡了半天突然一大段」。
# Nginx 反向代理 SSE 的正确配置
location /api/chat/stream {
proxy_pass http://backend;
# ✅ 关键配置:禁用代理缓冲
proxy_buffering off;
# ✅ 关键配置:禁用 chunked 传输编码转换
proxy_http_version 1.1;
proxy_set_header Connection '';
# ✅ 设置较长的超时(AI 流式可能持续 2 分钟+)
proxy_read_timeout 300s;
proxy_send_timeout 300s;
# ✅ 传递客户端真实 IP 和 Last-Event-ID
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Last-Event-ID $http_last_event_id;
}
📋 坑点 2:浏览器 6 连接限制
HTTP/1.1 规范限制浏览器对同一域名最多 6 个并发连接。如果一个 SSE 连接长期占用,其他页面请求可能被阻塞。
| 场景 | 影响 | 解决方案 |
|---|---|---|
| 用户打开多个标签页 | 第 7 个标签页 SSE 连不上 | 使用 BroadcastChannel 共享连接,或升级 HTTP/2 |
| SSE 长连接期间发 XHR | 请求排队等待 | 为 SSE 使用独立子域名(如 sse.example.com) |
| HTTP/2 环境 | 无限制 | HTTP/2 多路复用消除了 6 连接限制 |
// 用 BroadcastChannel 在多标签页间共享一个 SSE 连接
const channel = new BroadcastChannel('sse-shared');
let isLeader = false;
const lock = new BroadcastChannel('sse-lock');
// 简单的主标签页选举
navigator.locks?.request('sse-connection', async () => {
isLeader = true;
const sse = new SSEClient('/events');
sse.on('message', (e) => {
// 主标签页收到消息后广播给其他标签页
channel.postMessage(JSON.parse(e.data));
});
sse.connect();
// 持有锁直到标签页关闭
await new Promise(() => {});
});
// 非主标签页通过 BroadcastChannel 接收
channel.onmessage = (e) => {
handleSSEMessage(e.data);
};
📋 坑点 3:错误处理与优雅降级
SSE 连接可能因网络波动、服务重启、负载均衡器超时等原因断开。生产环境必须实现完善的错误处理链路:
// 生产级错误处理策略
class RobustSSEClient {
// 三级降级策略
async connectWithFallback() {
// 第一级:尝试 SSE(最优体验)
try {
await this.connectSSE();
return 'sse';
} catch (e) {
console.warn('SSE 不可用,降级到长轮询');
}
// 第二级:降级到长轮询(兼容性最好)
try {
await this.connectLongPolling();
return 'long-polling';
} catch (e) {
console.warn('长轮询不可用,降级到短轮询');
}
// 第三级:降级到短轮询(最保底)
this.connectShortPolling();
return 'short-polling';
}
// SSE 连接(带指数退避重试)
async connectSSE() {
const MAX_RETRY = 5;
for (let i = 0; i < MAX_RETRY; i++) {
try {
const client = new SSEClient(this.url, {
headers: this.headers,
timeout: 60000,
});
await client.connect();
return client;
} catch (err) {
// 指数退避:1s, 2s, 4s, 8s, 16s
const delay = Math.min(1000 * Math.pow(2, i), 16000);
// 添加随机抖动,避免雷群效应
const jitter = delay * 0.5 * Math.random();
await new Promise(r => setTimeout(r, delay + jitter));
}
}
throw new Error('SSE 连接失败,已重试 5 次');
}
// 长轮询降级
async connectLongPolling() {
while (true) {
const response = await fetch(`${this.url}/poll?timeout=30000`, {
headers: this.headers,
});
const events = await response.json();
events.forEach(event => this._emit('message', event));
}
}
// 短轮询兜底
connectShortPolling(interval = 5000) {
this._pollTimer = setInterval(async () => {
const response = await fetch(`${this.url}/snapshot`, {
headers: this.headers,
});
const data = await response.json();
this._emit('message', data);
}, interval);
}
}
⚠️ **警告:**永远不要在 SSE 连接断开后立即重连。无间隔重连会导致「雷群效应」——如果服务端重启,所有客户端同时重连可能瞬间打垮服务。务必使用指数退避(Exponential Backoff)加随机抖动(Jitter)。
📊 四、实战案例:构建实时 AI 对话组件
将前面的所有知识整合,构建一个完整的 Vue 3 流式对话组件:
<!-- StreamChat.vue — Vue 3 组合式 API 实现的流式对话组件 -->
<script setup>
import { ref, reactive, onUnmounted, nextTick } from 'vue'
const messages = reactive([])
const inputText = ref('')
const isStreaming = ref(false)
const connectionStatus = ref('disconnected') // disconnected | connecting | connected
const outputRef = ref(null)
let controller = null
async function sendMessage() {
if (!inputText.value.trim() || isStreaming.value) return
const userMsg = { role: 'user', content: inputText.value }
messages.push(userMsg)
inputText.value = ''
isStreaming.value = true
connectionStatus.value = 'connecting'
// 创建 AI 回复占位
const aiMsg = reactive({ role: 'assistant', content: '', status: 'streaming' })
messages.push(aiMsg)
controller = new AbortController()
try {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages: messages.slice(0, -1) }),
signal: controller.signal,
})
connectionStatus.value = 'connected'
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim().startsWith('data:')) continue
const data = line.trim().slice(5).trim()
if (data === '[DONE]') continue
try {
const parsed = JSON.parse(data)
const delta = parsed.choices?.[0]?.delta?.content || ''
aiMsg.content += delta
// 自动滚动到底部
await nextTick()
outputRef.value?.scrollTo({ top: outputRef.value.scrollHeight })
} catch (e) { /* 忽略 */ }
}
}
aiMsg.status = 'done'
} catch (err) {
if (err.name !== 'AbortError') {
aiMsg.content = `❌ 请求失败: ${err.message}`
aiMsg.status = 'error'
}
} finally {
isStreaming.value = false
connectionStatus.value = 'disconnected'
}
}
function stopStreaming() {
controller?.abort()
isStreaming.value = false
}
onUnmounted(() => controller?.abort())
</script>
<template>
<div class="chat-container">
<div class="status-bar">
<span :class="connectionStatus">
{{ { disconnected: '⚪ 未连接', connecting: '🟡 连接中...', connected: '🟢 已连接' }[connectionStatus] }}
</span>
</div>
<div class="messages" ref="outputRef">
<div v-for="(msg, i) in messages" :key="i" :class="msg.role">
<pre>{{ msg.content }}<span v-if="msg.status === 'streaming'" class="cursor">▋</span></pre>
</div>
</div>
<form @submit.prevent="sendMessage" class="input-area">
<input v-model="inputText" placeholder="输入消息..." :disabled="isStreaming" />
<button v-if="!isStreaming" type="submit">发送</button>
<button v-else type="button" @click="stopStreaming">停止</button>
</form>
</div>
</template>
💡 五、最佳实践总结
经过大量生产实践,以下是 SSE 开发的核心 checklist:
✅ 推荐做法:
- ✅ 始终设置
X-Accel-Buffering: no响应头,防止 Nginx/CDN 缓冲 - ✅ 实现心跳机制(每 30 秒发送注释行
: heartbeat),防止代理超时断连 - ✅ 使用
Last-Event-ID实现断线续传,不丢失任何事件 - ✅ 采用指数退避 + 随机抖动的重连策略
- ✅ 为 SSE 连接设置独立子域名,避免阻塞同域 HTTP 请求
- ✅ 上游客户端断开时,立即取消下游连接,节省资源
❌ 避免做法:
- ❌ 使用原生
EventSource连接需要认证的 API(不支持自定义 Header) - ❌ 断线后立即重连(会导致雷群效应)
- ❌ 在 Nginx 中使用默认的
proxy_buffering on(消息延迟严重) - ❌ 忽略 HTTP/2 环境下 6 连接限制已解除的事实
🔧 SSE 最佳搭档工具:
| 工具 | 用途 | 推荐指数 |
|---|---|---|
| jsjson.com JSON 格式化 | 在线格式化 SSE 返回的 JSON 数据 | ⭐⭐⭐⭐⭐ |
| EventSource Polyfill | 兼容旧浏览器 + 支持自定义 Header | ⭐⭐⭐⭐ |
| SSE.js (npm) | 轻量级 Node.js SSE 客户端库 | ⭐⭐⭐⭐ |
| Vercel AI SDK | 开箱即用的 AI 流式 UI 组件 | ⭐⭐⭐⭐⭐ |
SSE 不是什么新技术,但在 AI 流式输出成为标配的 2026 年,它焕发了新生。相比 WebSocket 的重量级双向通道,SSE 以其极简的协议、原生的重连机制、标准 HTTP 的兼容性,成为了服务器单向推送场景的最优解。掌握本文的实战模式,你就具备了构建任何实时流式应用的能力。