Server-Sent Events 深度实战:AI 时代流式通信的首选方案

SSE 在 AI 流式 API 中全面胜出的深层原因,从协议原理到生产级 Node.js 实现,对比 WebSocket、Long Polling 的性能与成本差异,附完整可运行代码。

前端开发 2026-06-02 15 分钟

当 OpenAI、Anthropic、Google 三大 AI 巨头不约而同选择 SSE(Server-Sent Events)作为流式输出协议时,这个诞生于 2006 年的"老"技术在 2026 年迎来了真正的高光时刻。据 HTTP Archive 统计,使用 SSE 的生产站点数量在过去两年增长了 470%,其中超过 80% 与 AI 应用相关。

但 SSE 的价值远不止 AI 聊天流式输出。实时通知、进度追踪、日志流、股票行情——任何「服务器单向推送」场景,SSE 都是比 WebSocket 更轻量、更可靠的选择。本文将从协议原理到生产级实现,帮你彻底掌握这个被严重低估的技术。

🔧 一、SSE 协议原理与核心机制

📡 协议本质:HTTP 上的单向流

SSE 不是 WebSocket 那样的独立协议,它就是标准 HTTP。客户端发起普通 GET 请求,服务器返回 Content-Type: text/event-stream,然后保持连接不断开,持续推送数据。

这个设计带来了三个关键优势:

  • 天然穿透代理和防火墙 — 因为它就是 HTTP 流
  • 自动重连EventSource API 内置断线重连机制
  • 零额外依赖 — 不需要 ws、socket.io 之类的库

协议格式极其简洁,每条消息由一个或多个 field: value 行组成,以空行分隔:

event: message
id: 123
retry: 5000
data: {"content": "你好"}

data: 多行数据第一行
data: 多行数据第二行

字段说明:

字段 作用 必填
data 消息内容,可多行,每行以 data: 前缀 ✅ 是
event 事件类型,默认为 message ❌ 否
id 消息 ID,用于断线重连时的 Last-Event-ID ❌ 否
retry 告诉客户端重连间隔(毫秒) ❌ 否

📌 记住:SSE 消息以两个换行符\n\n)结尾。这是一个常见的坑——如果你的 data 内容本身就包含换行,每行都必须以 data: 开头,否则会被当成多条消息。

🔄 与 WebSocket 的本质区别

很多人把 SSE 和 WebSocket 混为一谈,但它们解决的是完全不同的问题:

维度 SSE WebSocket
通信方向 服务器 → 客户端(单向) 双向
协议 HTTP/1.1 或 HTTP/2 独立的 ws:// 协议
自动重连 ✅ 内置 ❌ 需手动实现
二进制数据 ❌ 仅文本 ✅ 支持
负载均衡 ✅ 天然支持 ⚠️ 需要粘性会话
连接数限制 HTTP/1.1 有 6 连接限制,HTTP/2 无
代理穿透 ✅ 无需配置 ⚠️ 部分代理不支持

⚡ **关键结论:**如果你的场景是「服务器推数据给客户端」(AI 流式输出、通知、日志),SSE 在工程复杂度上完胜 WebSocket。只有在需要双向实时通信(聊天、协同编辑、游戏)时才应该考虑 WebSocket。

🚀 二、生产级 SSE 服务端实现

🟢 Node.js 原生实现(零依赖)

大多数教程只展示最基础的 Hello World,下面是一个生产级的 Node.js SSE 实现,包含心跳保活、客户端管理、优雅关闭:

// server.js — 生产级 SSE 服务端(Node.js 原生,零依赖)
import http from 'node:http';

// 存储所有活跃的 SSE 连接
const clients = new Set();

// 心跳间隔(毫秒),防止代理/防火墙超时断连
const HEARTBEAT_INTERVAL = 15_000;

function sseHandler(req, res) {
  if (req.method !== 'GET') {
    res.writeHead(405).end('Method Not Allowed');
    return;
  }

  // 设置 SSE 必要的响应头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',           // 禁止缓存
    'Connection': 'keep-alive',            // 保持连接
    'X-Accel-Buffering': 'no',             // 关键!禁用 Nginx 缓冲
    'Access-Control-Allow-Origin': '*',    // CORS
  });

  // 发送初始连接确认
  res.write('event: connected\ndata: {"status":"ok"}\n\n');

  // 注册客户端
  clients.add(res);
  console.log(`[SSE] 客户端连接,当前 ${clients.size} 个`);

  // 心跳:定期发送注释行(以 : 开头的行会被客户端忽略)
  const heartbeat = setInterval(() => {
    res.write(': heartbeat\n\n');
  }, HEARTBEAT_INTERVAL);

  // 客户端断开时清理
  req.on('close', () => {
    clearInterval(heartbeat);
    clients.delete(res);
    console.log(`[SSE] 客户端断开,剩余 ${clients.size} 个`);
  });
}

// 向所有客户端广播消息
function broadcast(event, data) {
  const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
  for (const client of clients) {
    client.write(message);
  }
}

// 向单个客户端发送消息
function send(client, event, data, id) {
  let msg = '';
  if (id) msg += `id: ${id}\n`;
  if (event) msg += `event: ${event}\n`;
  msg += `data: ${JSON.stringify(data)}\n\n`;
  client.write(msg);
}

// 创建服务器
const server = http.createServer((req, res) => {
  if (req.url === '/events') {
    sseHandler(req, res);
  } else {
    res.writeHead(200, { 'Content-Type': 'text/html' });
    res.end('<h1>SSE Server Running</h1>');
  }
});

// 模拟每 3 秒推送一次数据
setInterval(() => {
  broadcast('update', {
    time: new Date().toISOString(),
    online: clients.size,
  });
}, 3000);

// 优雅关闭
process.on('SIGTERM', () => {
  console.log('[SSE] 收到 SIGTERM,关闭所有连接...');
  for (const client of clients) {
    client.write('event: shutdown\ndata: {"reason":"server stopping"}\n\n');
    client.end();
  }
  server.close(() => process.exit(0));
});

server.listen(3000, () => console.log('SSE server on :3000/events'));

⚠️ 警告:X-Accel-Buffering: no 这个响应头至关重要。如果你的服务器前面有 Nginx 反向代理,Nginx 默认会缓冲响应,导致 SSE 消息无法实时到达客户端。没有这个头,你的 SSE 在开发环境正常,一上生产就"延迟"。

🔥 AI 流式输出的正确姿势

AI 应用是 SSE 最大的用武之地。下面是一个模拟 OpenAI 风格的流式输出实现,展示了如何正确处理 Token 逐字输出:

// ai-stream.js — 模拟 AI 流式输出(OpenAI 兼容格式)
import http from 'node:http';

const DEMO_RESPONSE = 'Server-Sent Events 是 AI 时代最优雅的流式传输方案。它基于 HTTP 协议,天然兼容现有的基础设施,无需额外的协议升级。';

function streamAIResponse(req, res) {
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'X-Accel-Buffering': 'no',
  });

  const tokens = DEMO_RESPONSE.split('');
  let index = 0;
  let content = '';

  // 模拟逐 Token 输出,每个 Token 间隔 30-80ms
  const interval = setInterval(() => {
    if (index >= tokens.length) {
      // 发送结束标记
      res.write('data: [DONE]\n\n');
      clearInterval(interval);
      res.end();
      return;
    }

    const token = tokens[index];
    content += token;

    // OpenAI 兼容的 SSE 格式
    const chunk = {
      id: `chatcmpl-${Date.now()}`,
      object: 'chat.completion.chunk',
      choices: [{
        index: 0,
        delta: { content: token },
        finish_reason: null,
      }],
    };

    res.write(`data: ${JSON.stringify(chunk)}\n\n`);
    index++;
  }, 30 + Math.random() * 50);

  req.on('close', () => {
    clearInterval(interval);
  });
}

http.createServer((req, res) => {
  if (req.url === '/v1/chat/completions') {
    streamAIResponse(req, res);
  }
}).listen(3001, () => console.log('AI stream on :3001'));

💡 提示:真实生产环境中,AI 流式输出需要注意两个问题:一是流式中断恢复——客户端断线后应该能从断点继续,而不是重新生成;二是反压控制——如果网络慢于生成速度,需要暂停生成避免内存溢出。Node.js 的 stream.Writablewrite() 返回 false 时就是反压信号。

🟣 客户端:EventSource 的正确用法

浏览器端的 EventSource API 非常简洁,但有几个容易忽略的细节:

// client.js — 生产级 EventSource 客户端
class SSEClient {
  constructor(url, options = {}) {
    this.url = url;
    this.handlers = new Map();
    this.reconnectInterval = options.reconnectInterval || 3000;
    this.maxRetries = options.maxRetries || 10;
    this.retryCount = 0;
    this.connect();
  }

  connect() {
    // EventSource 自带重连,但我们要自定义逻辑
    this.es = new EventSource(this.url);

    this.es.onopen = () => {
      console.log('[SSE] 连接已建立');
      this.retryCount = 0;
      this.emit('_connected');
    };

    // 默认 message 事件
    this.es.onmessage = (event) => {
      this.emit('message', this.parseData(event));
    };

    // 注册所有自定义事件
    for (const [eventName] of this.handlers) {
      if (eventName !== 'message') {
        this.es.addEventListener(eventName, (event) => {
          this.emit(eventName, this.parseData(event));
        });
      }
    }

    this.es.onerror = (err) => {
      this.retryCount++;
      console.error(`[SSE] 连接错误 (第 ${this.retryCount} 次重连)`);

      if (this.retryCount >= this.maxRetries) {
        console.error('[SSE] 达到最大重试次数,停止重连');
        this.es.close();
        this.emit('_max_retries');
      }
    };
  }

  parseData(event) {
    try {
      return JSON.parse(event.data);
    } catch {
      return event.data;
    }
  }

  on(event, callback) {
    if (!this.handlers.has(event)) {
      this.handlers.set(event, []);
    }
    this.handlers.get(event).push(callback);
    return this; // 支持链式调用
  }

  emit(event, data) {
    const callbacks = this.handlers.get(event) || [];
    callbacks.forEach(cb => cb(data));
  }

  close() {
    this.es?.close();
    console.log('[SSE] 连接已关闭');
  }
}

// 使用示例
const sse = new SSEClient('/events')
  .on('connected', (data) => console.log('服务器确认:', data))
  .on('update', (data) => {
    document.getElementById('status').textContent =
      `在线: ${data.online}人 | 更新: ${data.time}`;
  })
  .on('message', (data) => console.log('默认消息:', data));

📌 记住:EventSource 默认只监听 message 事件。如果你的服务器发送了 event: update,你必须用 addEventListener('update', ...) 来接收,而不是 onmessage。这是新手最常踩的坑。

💡 三、HTTP/2 下的 SSE 与性能优化

⚡ HTTP/2 解决了 SSE 的最大历史包袱

在 HTTP/1.1 时代,浏览器对同一域名有 6 个并发连接的限制。每个 SSE 连接占用一个,加上页面本身的请求,很容易出现连接饥饿。

HTTP/2 通过多路复用(Multiplexing)彻底解决了这个问题:所有请求共享一个 TCP 连接,通过流 ID 区分。SSE 连接不再占用额外的连接槽位。

环境 SSE 连接开销 其他请求是否受影响
HTTP/1.1 占用 1 个连接 ⚠️ 最多 6 个并发
HTTP/2 共享 1 个连接 ✅ 无影响

🛡️ 生产部署 Checklist

把 SSE 部署到生产环境,以下是必须检查的事项:

Nginx 配置:

# nginx.conf — SSE 友好的反向代理配置
location /events {
    proxy_pass http://127.0.0.1:3000;
    proxy_http_version 1.1;
    proxy_set_header Connection '';       # 清除 Connection: close
    proxy_buffering off;                  # 禁用缓冲!
    proxy_cache off;                      # 禁用缓存!
    chunked_transfer_encoding off;        # 禁用分块编码
    proxy_read_timeout 86400s;            # 24 小时超时
    proxy_send_timeout 86400s;
}

性能优化要点:

  • ✅ 启用 HTTP/2 — 解决连接数限制
  • ✅ 使用 gzip 压缩 JSON 数据 — SSE 消息通常 JSON 格式,压缩率高达 80%
  • ❌ 不要在 SSE 连接中传输二进制数据 — SSE 只支持文本,二进制用 WebSocket
  • ❌ 不要为每个请求创建新的 SSE 连接 — 保持长连接复用
  • ⚠️ 设置合理的超时 — 云服务商(AWS ALB 默认 60 秒)可能主动断开空闲连接,用心跳保活

📊 SSE vs WebSocket vs Long Polling 全面对比

维度 SSE WebSocket Long Polling
实现复杂度 ⭐ 极低 ⭐⭐⭐ 中等 ⭐⭐ 低
服务端依赖 标准 HTTP 需 ws 库 标准 HTTP
自动重连 ✅ 内置 ❌ 手动 ❌ 手动
浏览器兼容性 除 IE 外全支持 全支持 全支持
HTTP/2 多路复用 ❌ 独立连接
反向代理兼容 ✅ 极好 ⚠️ 需配置 ✅ 极好
消息延迟 ~1ms ~1ms ~100ms-1s
服务器内存 低(每个连接几 KB) 中等(临时请求堆积)
适用场景 通知、流式、日志 聊天、游戏、协同 兼容旧浏览器

⚡ **关键结论:**对于 AI 流式输出、实时通知、日志流、进度追踪这四大场景,SSE 的投入产出比远高于 WebSocket。选择 SSE 不是"退而求其次",而是工程上的理性决策。

⚠️ 四、常见坑点与避坑指南

🕳️ 坑点 1:Nginx 默认缓冲导致消息"批量到达"

这是 SSE 生产部署中最常见的问题。表现是:开发环境一切正常,部署到生产后消息突然变得"一批一批"到达,而不是逐条实时推送。

**解决方案:**除了前面 Nginx 配置中的 proxy_buffering off,服务端也必须设置 X-Accel-Buffering: no 响应头作为双保险。

🕳️ 坑点 2:云平台的连接超时

AWS ALB 默认 60 秒超时,Cloudflare 免费版 100 秒超时。超过时间没有数据传输,连接就会被中间代理强制断开。

**解决方案:**每 15-30 秒发送一次心跳。SSE 协议规定以冒号开头的行是注释,客户端会忽略但连接保持活跃:

// 心跳:发送注释行,客户端不会收到 message 事件
res.write(': keepalive\n\n');

🕳️ 坑点 3:EventSource 不支持 POST 请求

EventSource 只支持 GET 请求。如果你需要发送请求体(比如 AI 对话的上下文),有两个方案:

**方案一:**先 POST 获取数据,再用 SSE 监听结果

// 先发送对话上下文
const { sessionId } = await fetch('/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ messages }),
}).then(r => r.json());

// 再用 SSE 监听流式输出
const es = new EventSource(`/api/chat/${sessionId}/stream`);

**方案二:**使用 fetch() + ReadableStream 替代 EventSource

// fetch-sse.js — 用 fetch 替代 EventSource,支持 POST
async function fetchSSE(url, body, onChunk) {
  const response = await fetch(url, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(body),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n\n');
    buffer = lines.pop(); // 保留未完成的部分

    for (const line of lines) {
      if (!line.trim()) continue;
      // 解析 SSE 格式
      const dataMatch = line.match(/^data: (.+)$/m);
      if (dataMatch) {
        const data = dataMatch[1];
        if (data === '[DONE]') return;
        try {
          onChunk(JSON.parse(data));
        } catch {
          onChunk(data);
        }
      }
    }
  }
}

// 使用示例:OpenAI 兼容的流式调用
fetchSSE('/v1/chat/completions', {
  model: 'gpt-4o',
  messages: [{ role: 'user', content: '解释 SSE 协议' }],
  stream: true,
}, (chunk) => {
  const content = chunk.choices?.[0]?.delta?.content;
  if (content) {
    document.getElementById('output').textContent += content;
  }
});

💡 提示:fetch() + ReadableStream 方案是目前 AI 应用的主流做法。Vercel AI SDK、LangChain.js 等框架内部都使用这种方式。它不受 GET 限制,也能更好地控制流的生命周期。

🕳️ 坑点 4:跨域请求的 Credentials

默认情况下,EventSource 不会携带 Cookie。如果你的 SSE 端点需要身份验证:

// 带 Cookie 的 EventSource(部分浏览器支持)
const es = new EventSource('/events', { withCredentials: true });

服务端也需要设置:

res.writeHead(200, {
  'Content-Type': 'text/event-stream',
  'Access-Control-Allow-Origin': 'https://your-domain.com', // 不能用 *
  'Access-Control-Allow-Credentials': 'true',
});

⚠️ **警告:**使用 withCredentials: true 时,Access-Control-Allow-Origin 不能设置为 *,必须指定具体的域名。这是 CORS 规范的硬性要求。

🎯 总结与工具推荐

SSE 在 2026 年的爆发不是偶然。当 AI 应用成为主流,流式输出成为刚需,SSE 的"简单"恰恰成了它最大的竞争力——它不需要额外的协议、不需要特殊的库、不需要复杂的基础设施改造。

选型建议:

  • ✅ AI 流式输出(ChatGPT 风格)→ SSE + fetch ReadableStream
  • ✅ 实时通知、消息推送 → SSE + EventSource
  • ✅ 服务器日志实时流 → SSE
  • ✅ 进度追踪(文件上传、任务执行)→ SSE
  • ❌ 多人在线游戏 → WebSocket
  • ❌ 实时协同编辑 → WebSocket + CRDT

相关工具推荐:

SSE 不是 WebSocket 的"低配版",它是 HTTP 生态下流式通信的正确答案。下次需要服务器推送数据时,先想想 SSE,它可能比你想象的更强大。

📚 相关文章