Server-Sent Events 实战指南:AI 时代流式通信的最佳选择

深入解析 SSE 技术原理、与 WebSocket 对比、在 LLM API 流式输出中的核心作用,包含 Node.js/Go/浏览器端完整实战代码,覆盖连接恢复、错误处理、生产部署最佳实践。

前端开发 2026-06-08 12 分钟

当 OpenAI 在 2022 年末让 ChatGPT 以「打字机」效果流式输出回答时,背后的技术并非 WebSocket——而是一个诞生于 2006 年、沉寂多年的 Web 标准:Server-Sent Events(SSE)。如今,从 Claude 到 Gemini,从 Copilot 到 Cursor,几乎所有 AI 产品都在用 SSE 做流式传输。但大多数开发者对 SSE 的理解还停留在 EventSource 三行代码的层面。本文将从协议原理到生产落地,带你彻底搞懂这个被低估的实时通信方案。

🔌 一、SSE 原理与三种实时通信方案对比

1.1 SSE 到底是什么?

SSE 是 HTML5 规范的一部分(WHATWG Streams API),核心机制非常简单:客户端发起一个普通 HTTP 请求,服务端不关闭连接,持续以 text/event-stream 格式推送数据。每条消息由一个或多个 field: value 行组成,用空行分隔。

data: {"token": "你"}

data: {"token": "好"}

data: [DONE]

与 WebSocket 的关键区别在于:SSE 是单向的(服务端→客户端),基于普通 HTTP/1.1 或 HTTP/2,天然支持自动重连、Last-Event-ID 恢复、以及所有现有的 HTTP 基础设施(代理、CDN、负载均衡)。

1.2 三种实时方案深度对比

维度 SSE WebSocket Long Polling
方向 单向(服务端→客户端) 全双工 模拟双向
协议 HTTP/1.1 或 HTTP/2 独立协议(ws://) HTTP/1.1
自动重连 ✅ 内置 ❌ 需手动实现 ❌ 需手动实现
断点续传 ✅ Last-Event-ID ❌ 需自行实现 ❌ 不支持
二进制数据 ❌ 仅文本 ✅ 原生支持 ❌ 仅文本
代理/CDN 兼容 ✅ 完全兼容 ⚠️ 需特殊配置 ✅ 完全兼容
HTTP/2 多路复用 ✅ 共享连接 ❌ 独立连接 ✅ 共享连接
最大并发连接 浏览器限制 6/域* 无限制 浏览器限制 6/域
典型延迟 < 50ms < 20ms 100-500ms
实现复杂度 ⭐ 低 ⭐⭐⭐ 高 ⭐⭐ 中

⚠️ **警告:**浏览器对同一域名的 HTTP/1.1 连接数限制为 6 个。SSE 和 Long Polling 都受此限制。但使用 HTTP/2 时,所有请求共享一个 TCP 连接的多路复用流,6 连接限制不再成为瓶颈。

📌 记住:如果你的场景是服务端向客户端推送数据(AI 流式输出、通知推送、实时日志、股票行情),SSE 几乎总是比 WebSocket 更好的选择——更简单、更可靠、更省资源。只有当你需要真正的双向实时通信(聊天室、协同编辑、游戏)时,才应该选 WebSocket。

1.3 为什么 AI 时代 SSE 重新崛起?

三个关键原因:

  1. 流式 Token 输出天然适配单向推送:LLM 生成文本时,服务端只需不断推送 token,客户端无需发送数据
  2. HTTP/2 多路复用消除了并发限制:6 连接限制在 HTTP/2 下不再是问题
  3. 基础设施兼容性:企业网络、反向代理、WAF 对 WebSocket 的支持参差不齐,但 SSE 作为普通 HTTP 请求,走遍天下

⚡ **关键结论:**SSE 不是 WebSocket 的「阉割版」,而是在单向推送场景下更优雅、更工程化的方案。

🚀 二、生产级 SSE 服务端实现

2.1 原生 Node.js 实现

先从零实现一个 SSE 服务端,理解底层协议细节:

// Node.js 原生 SSE 服务端 —— 不依赖任何框架
import http from 'node:http';

const clients = new Set();

const server = http.createServer((req, res) => {
  if (req.url === '/events') {
    // 设置 SSE 响应头
    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',          // 禁止缓存
      'Connection': 'keep-alive',            // 保持连接
      'X-Accel-Buffering': 'no',            // 关键!禁用 Nginx 缓冲
      'Access-Control-Allow-Origin': '*',    // CORS
    });

    // 发送初始注释,强制连接建立
    res.write(':ok\n\n');

    // 注册客户端
    clients.add(res);
    console.log(`客户端已连接,当前在线: ${clients.size}`);

    // 心跳:每 30 秒发送一次,防止代理/防火墙超时断开
    const heartbeat = setInterval(() => {
      res.write(':heartbeat\n\n');  // 注释行不会触发 onmessage
    }, 30000);

    // 客户端断开时清理
    req.on('close', () => {
      clients.delete(res);
      clearInterval(heartbeat);
      console.log(`客户端断开,当前在线: ${clients.size}`);
    });
  }
});

// 广播函数
function broadcast(event, data) {
  const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
  for (const client of clients) {
    client.write(message);
  }
}

// 模拟每秒推送一条消息
setInterval(() => {
  broadcast('tick', { time: new Date().toISOString(), users: clients.size });
}, 1000);

server.listen(3001, () => console.log('SSE 服务运行在 :3001'));

这段代码有几个生产级要点:X-Accel-Buffering: no 头部防止 Nginx 缓冲响应导致消息延迟;心跳机制防止中间设备因空闲超时而断开连接;Set 数据结构管理活跃客户端,断开时自动清理。

2.2 Nuxt 3 / Nitro 服务端实现

如果你的项目像 jsjson.com 一样使用 Nuxt 3,可以直接在 Server Routes 中实现 SSE。Nitro 底层使用 H3 框架,处理 SSE 需要拿到原始的 Node.js Response 对象:

// server/api/chat-stream.post.ts — Nuxt 3 Server Route 实现 LLM 流式输出
import { defineEventHandler, readBody, setResponseHeaders } from 'h3';

export default defineEventHandler(async (event) => {
  const { message } = await readBody(event);
  const res = event.node.res;

  // 设置 SSE 头部
  setResponseHeaders(event, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no',
  });

  // 模拟 LLM 流式输出(替换为真实 API 调用)
  const tokens = [
    '你好', '!', '我', '是', 'AI', '助手', '。',
    '很', '高兴', '为你', '服务', '。',
  ];

  for (const token of tokens) {
    res.write(`data: ${JSON.stringify({ token, done: false })}\n\n`);
    await new Promise(resolve => setTimeout(resolve, 80)); // 模拟生成延迟
  }

  res.write(`data: ${JSON.stringify({ token: '', done: true })}\n\n`);
  res.end();
});

💡 **提示:**Nuxt 3 的 Server Routes 默认会缓冲响应。确保调用 event.node.res.write() 直接写入 Node.js 响应流,而不是返回一个普通对象。

2.3 接入真实 LLM API(OpenAI 兼容接口)

将上面的模拟代码替换为真实的 OpenAI API 调用,实现一个生产级的 AI 流式代理:

// server/api/chat-stream.post.ts — 真实 OpenAI 流式代理
import { defineEventHandler, readBody, setResponseHeaders } from 'h3';

export default defineEventHandler(async (event) => {
  const { messages, model = 'gpt-4o' } = await readBody(event);
  const res = event.node.res;

  setResponseHeaders(event, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'X-Accel-Buffering': 'no',
  });

  const response = await fetch('https://api.openai.com/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
    },
    body: JSON.stringify({ model, messages, stream: true }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || ''; // 保留不完整的行

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const data = line.slice(6).trim();
      if (data === '[DONE]') {
        res.write('data: [DONE]\n\n');
        res.end();
        return;
      }
      try {
        const parsed = JSON.parse(data);
        const token = parsed.choices?.[0]?.delta?.content;
        if (token) {
          res.write(`data: ${JSON.stringify({ token })}\n\n`);
        }
      } catch { /* 跳过解析失败的行 */ }
    }
  }

  res.end();
});

这段代码展示了 SSE 的核心优势:逐 chunk 流式传输。服务端从 OpenAI API 读取数据流,解析出每个 token,立即通过 SSE 推送给客户端,整个过程中客户端保持连接不断开。

🛡️ 三、客户端实现与生产级陷阱

3.1 基础客户端:EventSource 的局限

浏览器原生的 EventSource API 虽然简单,但在生产环境中有一个致命缺陷:不支持 POST 请求和自定义请求头。这意味着你无法用它发送请求体或携带 Authorization Token。

// ❌ 错误写法:原生 EventSource 无法 POST 和携带认证头
const es = new EventSource('/api/chat-stream');
es.onmessage = (e) => console.log(e.data);
// 无法发送 { message: "你好" } 请求体
// 无法设置 Authorization: Bearer xxx 头

3.2 生产级客户端:fetch + ReadableStream

正确的做法是用 fetch API 手动解析 SSE 流,这也是所有主流 AI SDK 的实现方式:

// ✅ 正确写法:fetch 手动解析 SSE 流,支持 POST + 自定义头部
async function streamChat(message, onToken, onDone) {
  const response = await fetch('/api/chat-stream', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${getToken()}`,  // 支持认证
    },
    body: JSON.stringify({
      messages: [{ role: 'user', content: message }],
    }),
  });

  if (!response.ok) throw new Error(`HTTP ${response.status}`);

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6).trim();
        if (data === '[DONE]') {
          onDone();
          return;
        }
        try {
          const parsed = JSON.parse(data);
          onToken(parsed.token);
        } catch { /* 忽略解析错误 */ }
      }
    }
  }
  onDone();
}

// 使用方式
let result = '';
await streamChat('你好',
  (token) => { result += token; updateUI(result); },
  () => console.log('流式输出完成')
);

3.3 ⚠️ 生产环境避坑指南

这些是我在生产中踩过的坑,每一个都可能导致诡异的线上问题:

坑点 1:Nginx 代理缓冲

Nginx 默认会缓冲 upstream 响应,导致 SSE 消息不会实时推送到客户端,而是一次性发送所有缓存的数据。

# nginx.conf — 正确的 SSE 代理配置
location /api/chat-stream {
    proxy_pass http://127.0.0.1:3000;
    proxy_http_version 1.1;
    proxy_set_header Connection '';          # 清除 Connection 头
    proxy_buffering off;                     # 关键!禁用代理缓冲
    proxy_cache off;                         # 禁用缓存
    chunked_transfer_encoding off;           # 禁用分块传输
    proxy_read_timeout 300s;                 # 5 分钟超时,适配长对话
}

坑点 2:客户端 AbortController 未清理

用户切换页面或关闭对话时,如果不主动中止 fetch 请求,连接会一直占用资源。使用 AbortController 可以优雅地取消流式传输:

// 客户端中止控制器 —— 防止连接泄漏
let currentController = null;

async function streamChatWithAbort(message, onToken) {
  // 中止上一个未完成的请求
  if (currentController) currentController.abort();
  currentController = new AbortController();

  try {
    const response = await fetch('/api/chat-stream', {
      method: 'POST',
      signal: currentController.signal,  // 传入中止信号
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ message }),
    });
    // ... 解析流的逻辑同上
  } catch (err) {
    if (err.name === 'AbortError') {
      console.log('用户主动取消');
      return;
    }
    throw err;
  }
}

// 页面卸载时清理
window.addEventListener('beforeunload', () => {
  if (currentController) currentController.abort();
});

坑点 3:服务端连接数耗尽

每个 SSE 连接都是一个长时间存活的 HTTP 连接。如果不限制最大连接数,恶意用户可以轻松耗尽你的服务器资源。必须设置连接上限:

// 服务端连接数限制 —— 防止资源耗尽
const MAX_CLIENTS = 1000;
const clients = new Map(); // key: clientId, value: response

function handleSSE(req, res) {
  if (clients.size >= MAX_CLIENTS) {
    res.writeHead(429, { 'Content-Type': 'application/json' });
    res.end(JSON.stringify({ error: '服务器繁忙,请稍后重试' }));
    return;
  }

  const clientId = crypto.randomUUID();
  clients.set(clientId, res);

  // 设置单连接超时:5 分钟无活动自动断开
  const timeout = setTimeout(() => {
    res.write('event: timeout\ndata: {"reason":"idle timeout"}\n\n');
    res.end();
    clients.delete(clientId);
  }, 300_000);

  req.on('data', () => clearTimeout(timeout)); // 有数据活动时重置定时器
  req.on('close', () => {
    clearTimeout(timeout);
    clients.delete(clientId);
  });
}

⚠️ **警告:**Node.js 默认最大 socket 数约为几千个。如果 SSE 连接数超过 1000,需要调整 server.maxConnections 和操作系统级的文件描述符限制(ulimit -n)。高并发场景建议使用专门的消息队列(如 Redis Pub/Sub)做多实例广播。

📊 四、完整实战:构建 SSE 连接管理器

将上述所有最佳实践封装成一个可复用的连接管理器,同时处理重连、心跳、认证和背压:

// sse-client.js — 生产级 SSE 客户端封装
class SSEClient {
  #controller = null;
  #retryCount = 0;
  #maxRetries = 5;
  #baseDelay = 1000;

  constructor(url, options = {}) {
    this.url = url;
    this.headers = options.headers || {};
    this.onMessage = options.onMessage || (() => {});
    this.onError = options.onError || console.error;
    this.onOpen = options.onOpen || (() => {});
    this.maxRetries = options.maxRetries ?? 5;
  }

  async connect(body = {}) {
    this.#controller = new AbortController();

    try {
      const response = await fetch(this.url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          ...this.headers,
        },
        body: JSON.stringify(body),
        signal: this.#controller.signal,
      });

      if (!response.ok) {
        if (response.status === 429) {
          const retryAfter = response.headers.get('Retry-After') || 5;
          this.#scheduleRetry(body, retryAfter * 1000);
          return;
        }
        throw new Error(`SSE 连接失败: HTTP ${response.status}`);
      }

      this.#retryCount = 0; // 连接成功,重置重试计数
      this.onOpen();

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let buffer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const parts = buffer.split('\n\n');
        buffer = parts.pop() || '';

        for (const part of parts) {
          this.#parseMessage(part);
        }
      }
    } catch (err) {
      if (err.name === 'AbortError') return;
      this.#scheduleRetry(body);
    }
  }

  #parseMessage(raw) {
    let event = 'message';
    let data = '';

    for (const line of raw.split('\n')) {
      if (line.startsWith('event: ')) event = line.slice(7);
      if (line.startsWith('data: ')) data += line.slice(6);
    }

    if (data === '[DONE]') return;

    try {
      this.onMessage({ event, data: JSON.parse(data) });
    } catch {
      this.onMessage({ event, data });
    }
  }

  #scheduleRetry(body, delay) {
    if (this.#retryCount >= this.#maxRetries) {
      this.onError(new Error('超过最大重试次数'));
      return;
    }

    const waitTime = delay || this.#baseDelay * Math.pow(2, this.#retryCount);
    this.#retryCount++;
    console.log(`SSE 将在 ${waitTime}ms 后重连 (第 ${this.#retryCount} 次)`);
    setTimeout(() => this.connect(body), waitTime);
  }

  close() {
    this.#controller?.abort();
  }
}

// 使用示例
const client = new SSEClient('/api/chat-stream', {
  headers: { Authorization: 'Bearer xxx' },
  onMessage: ({ event, data }) => {
    if (event === 'token') appendToUI(data.token);
  },
  onError: (err) => showToast(err.message),
});
client.connect({ message: '你好' });

这个封装处理了生产环境中最关键的几个问题:指数退避重连(避免网络抖动时疯狂重试)、429 限流响应处理、连接中止与资源清理、以及二进制 buffer 拼接导致的消息边界问题。

💡 **提示:**指数退避(Exponential Backoff)的计算公式是 baseDelay × 2^retryCount,即第 1 次等 1 秒,第 2 次等 2 秒,第 3 次等 4 秒。这避免了大量客户端同时重连导致的「雷群效应」(Thundering Herd)。

🔑 五、总结与技术选型建议

SSE 在 AI 时代焕发出新的生命力,但它并非银弹。以下是明确的选型决策框架:

✅ **选择 SSE 的场景:**AI/LLM 流式输出、实时通知推送、日志流、股票行情、进度更新、服务端事件广播

❌ **不选 SSE 的场景:**双向实时通信(聊天室)、高频交互(游戏)、二进制数据传输(音视频)、需要低延迟双向推送(协同编辑)

💰 **成本优势:**SSE 基于普通 HTTP 连接,无需独立的 WebSocket 网关,CDN 和负载均衡器零配置支持。对于大多数推送场景,SSE 的基础设施成本比 WebSocket 低 30-50%。

相关工具推荐:

  • 🔧 EventSource polyfilleventsource-polyfill — 兼容旧浏览器和 Node.js
  • 🔧 H3/Nitro:Nuxt 3 的服务端框架,原生支持 Node.js Response 流操作
  • 🔧 PartyKit:基于 Cloudflare Workers 的实时应用框架,内置 SSE 支持
  • 🔧 Mercure:基于 SSE 的实时推送协议,自带 Hub 和授权机制
  • 🔧 json-formatter:开发调试 SSE 时,用 jsjson.com/json-format 格式化 data 字段中的 JSON 数据,提升调试效率

SSE 的美在于它的简单——一个 HTTP 连接,一种文本格式,一套标准 API。在过度工程化的今天,选择最简单的可行方案,往往才是最好的工程决策。

📚 相关文章