Server-Sent Events 实战指南:AI 时代的实时流式通信利器

深入解析 SSE(Server-Sent Events)在 AI 流式输出、实时通知、数据推送等场景的实战应用,对比 WebSocket,附完整代码示例与生产环境最佳实践。

前端开发 2026-05-29 12 分钟

2025 年至今,几乎所有的 AI 大模型 API(OpenAI、Claude、Gemini、国产大模型)都采用 Server-Sent Events(SSE)作为流式输出的默认协议。据统计,超过 90% 的 LLM API 调用使用流式模式,而 SSE 正是这背后的传输基石。如果你还在用轮询或长连接来处理实时数据,那这篇文章会让你彻底改变认知——SSE 的简洁性和可靠性远超大多数开发者的想象。

🔍 一、SSE 核心原理与技术细节

Server-Sent Events 是 W3C 标准(HTML5 规范的一部分),基于 HTTP/1.1 的长连接,实现服务器到客户端的单向实时推送。与 WebSocket 不同,SSE 天然支持自动重连和事件 ID 追踪,这在生产环境中是巨大的优势。

📋 协议格式详解

SSE 的数据格式非常简洁,基于纯文本的 text/event-stream MIME 类型。每条消息由一个或多个字段组成,字段之间用 \n 分隔,消息之间用 \n\n 分隔。

// SSE 消息格式
data: 第一行数据\n
data: 第二行数据\n
event: message-type\n
id: 12345\n
retry: 5000\n
\n

四个核心字段的含义:

字段 作用 是否必须 说明
data 消息内容 ✅ 必须 多行用多个 data:,客户端用 \n 拼接
event 事件类型 ❌ 可选 默认为 message,客户端按类型监听
id 事件 ID ❌ 可选 断线重连时通过 Last-Event-ID 请求头回传
retry 重连间隔 ❌ 可选 单位为毫秒,客户端自动采用

📋 完整的服务端实现(Node.js)

以下是一个生产级的 SSE 服务端实现,包含心跳保活、客户端管理和错误处理:

// Node.js 原生实现 SSE 服务端(无需框架依赖)
const http = require('http');

class SSEManager {
  constructor() {
    // 存储所有活跃的客户端连接
    this.clients = new Map();
    this.heartbeatInterval = 30000; // 30 秒心跳
    this._startHeartbeat();
  }

  // 处理新的 SSE 连接
  handleConnection(req, res) {
    // 设置 SSE 必需的响应头
    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',           // 禁止缓存
      'Connection': 'keep-alive',             // 保持连接
      'X-Accel-Buffering': 'no',              // 禁用 Nginx 缓冲
      'Access-Control-Allow-Origin': '*',     // CORS
    });

    // 从 URL 或 Header 中提取客户端 ID
    const clientId = req.headers['x-client-id'] || crypto.randomUUID();

    // 注册客户端
    this.clients.set(clientId, { res, connectedAt: Date.now() });
    console.log(`客户端 ${clientId} 已连接,当前在线: ${this.clients.size}`);

    // 支持断线重连:读取 Last-Event-ID
    const lastEventId = req.headers['last-event-id'];
    if (lastEventId) {
      console.log(`客户端 ${clientId} 从事件 ${lastEventId} 恢复`);
      // 这里可以重放 lastEventId 之后的事件
    }

    // 发送初始连接确认
    this.send(clientId, { type: 'connected', clientId });

    // 监听客户端断开
    req.on('close', () => {
      this.clients.delete(clientId);
      console.log(`客户端 ${clientId} 已断开,当前在线: ${this.clients.size}`);
    });

    req.on('error', () => {
      this.clients.delete(clientId);
    });
  }

  // 向指定客户端发送事件
  send(clientId, data, eventType = 'message', eventId = null) {
    const client = this.clients.get(clientId);
    if (!client) return false;

    let message = '';
    if (eventId) message += `id: ${eventId}\n`;
    if (eventType !== 'message') message += `event: ${eventType}\n`;

    // 支持多行 data
    const dataStr = typeof data === 'string' ? data : JSON.stringify(data);
    dataStr.split('\n').forEach(line => {
      message += `data: ${line}\n`;
    });

    message += '\n'; // 消息结尾的空行

    try {
      client.res.write(message);
      return true;
    } catch (err) {
      this.clients.delete(clientId);
      return false;
    }
  }

  // 广播给所有客户端
  broadcast(data, eventType = 'message') {
    let sent = 0;
    for (const [clientId] of this.clients) {
      if (this.send(clientId, data, eventType)) sent++;
    }
    return sent;
  }

  // 心跳保活,防止代理/防火墙断开空闲连接
  _startHeartbeat() {
    setInterval(() => {
      for (const [clientId, client] of this.clients) {
        try {
          client.res.write(': heartbeat\n\n');  // 注释行作为心跳
        } catch (err) {
          this.clients.delete(clientId);
        }
      }
    }, this.heartbeatInterval);
  }
}

// 启动服务
const sse = new SSEManager();
const server = http.createServer((req, res) => {
  if (req.url === '/events') {
    sse.handleConnection(req, res);
  } else if (req.url === '/push' && req.method === 'POST') {
    // 模拟推送消息
    let body = '';
    req.on('data', chunk => body += chunk);
    req.on('end', () => {
      const count = sse.broadcast(JSON.parse(body));
      res.writeHead(200, { 'Content-Type': 'application/json' });
      res.end(JSON.stringify({ sent: count }));
    });
  } else {
    res.writeHead(404);
    res.end('Not Found');
  }
});

server.listen(3001, () => console.log('SSE 服务运行在 :3001'));

📌 **记住:**SSE 的响应头中 Cache-Control: no-cacheX-Accel-Buffering: no 缺一不可,否则 Nginx、CDN 等中间层会缓存数据,导致客户端收不到实时消息。

📋 前端客户端实现

浏览器原生的 EventSource API 虽然简洁,但有两个致命缺陷:不支持自定义请求头(无法携带 Authorization)和只能使用 GET 方法。下面分别展示原生用法和生产级封装:

// ❌ 原生 EventSource 的局限性
const source = new EventSource('/events');
source.onmessage = (e) => console.log('收到:', e.data);
source.onerror = (e) => console.error('连接错误');

// ❌ 问题:无法携带 Authorization 头,无法 POST body
// 对于需要认证的 API 场景完全不够用

// ✅ 生产级 SSE 客户端(支持认证、重连、超时)
class SSEClient {
  constructor(url, options = {}) {
    this.url = url;
    this.options = {
      headers: options.headers || {},
      retryInterval: options.retryInterval || 3000,
      maxRetries: options.maxRetries || 10,
      timeout: options.timeout || 120000, // 2 分钟超时
      body: options.body || null,
      method: options.method || 'GET',
    };
    this.controller = null;
    this.retryCount = 0;
    this.listeners = new Map();
    this.lastEventId = null;
  }

  // 用 fetch 实现 SSE(支持自定义 header 和 POST)
  async connect() {
    this.controller = new AbortController();

    const headers = {
      'Accept': 'text/event-stream',
      'Cache-Control': 'no-cache',
      ...this.options.headers,
    };

    // 断线重连时传递 Last-Event-ID
    if (this.lastEventId) {
      headers['Last-Event-ID'] = this.lastEventId;
    }

    try {
      const response = await fetch(this.url, {
        method: this.options.method,
        headers,
        body: this.options.body ? JSON.stringify(this.options.body) : undefined,
        signal: this.controller.signal,
      });

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}: ${response.statusText}`);
      }

      this.retryCount = 0; // 连接成功,重置重试计数
      await this._readStream(response.body);
    } catch (err) {
      if (err.name === 'AbortError') return; // 主动断开
      console.error(`SSE 连接失败: ${err.message}`);
      this._scheduleReconnect();
    }
  }

  // 逐行解析 SSE 流
  async _readStream(body) {
    const reader = body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';
    let eventType = 'message';
    let eventId = null;
    let eventData = '';

    // 超时检测
    let timeoutId = this._resetTimeout();

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      clearTimeout(timeoutId);
      timeoutId = this._resetTimeout();

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop(); // 保留未完成的行

      for (const line of lines) {
        if (line.startsWith('event:')) {
          eventType = line.slice(6).trim();
        } else if (line.startsWith('id:')) {
          eventId = line.slice(3).trim();
          this.lastEventId = eventId;
        } else if (line.startsWith('data:')) {
          eventData += (eventData ? '\n' : '') + line.slice(5).trim();
        } else if (line === '') {
          // 空行 = 消息结束,触发事件
          if (eventData) {
            this._emit(eventType, {
              data: eventData,
              id: eventId,
              type: eventType,
            });
          }
          eventType = 'message';
          eventId = null;
          eventData = '';
        }
        // 注释行(: 开头)是心跳,忽略
      }
    }

    clearTimeout(timeoutId);
  }

  _resetTimeout() {
    return setTimeout(() => {
      console.warn('SSE 超时,断开重连');
      this.disconnect();
      this._scheduleReconnect();
    }, this.options.timeout);
  }

  _scheduleReconnect() {
    if (this.retryCount >= this.options.maxRetries) {
      this._emit('fatal', { data: '达到最大重试次数' });
      return;
    }
    this.retryCount++;
    const delay = this.options.retryInterval * Math.min(this.retryCount, 5);
    console.log(`第 ${this.retryCount} 次重连,${delay}ms 后...`);
    setTimeout(() => this.connect(), delay);
  }

  on(event, callback) {
    if (!this.listeners.has(event)) this.listeners.set(event, []);
    this.listeners.get(event).push(callback);
    return this;
  }

  _emit(event, data) {
    const callbacks = this.listeners.get(event) || [];
    callbacks.forEach(cb => cb(data));
    // 同时触发通配符
    const wildcard = this.listeners.get('*') || [];
    wildcard.forEach(cb => cb({ ...data, event }));
  }

  disconnect() {
    if (this.controller) this.controller.abort();
  }
}

// 使用示例:连接需要认证的 AI 流式 API
const sse = new SSEClient('https://api.example.com/chat/stream', {
  headers: { 'Authorization': 'Bearer sk-xxxx' },
  method: 'POST',
  body: { model: 'gpt-4', messages: [{ role: 'user', content: '你好' }] },
  retryInterval: 2000,
  maxRetries: 5,
});

sse.on('message', (e) => {
  const chunk = JSON.parse(e.data);
  document.getElementById('output').textContent += chunk.choices[0]?.delta?.content || '';
});

sse.on('done', () => {
  console.log('流式输出完成');
  sse.disconnect();
});

sse.on('fatal', () => alert('连接不可用,请刷新页面'));

sse.connect();

🤖 二、SSE 在 AI 流式场景中的深度实践

当前几乎所有主流 LLM API 都使用 SSE 作为流式传输协议。理解 SSE 在 AI 场景中的特殊处理方式,是每个现代后端开发者的基本功。

📋 AI 流式协议的统一规范

不同厂商的 AI API 在 SSE 事件格式上高度趋同,都遵循 OpenAI 首创的 chat.completion.chunk 模式:

// 统一的 AI 流式消费端:兼容 OpenAI / Claude / 国产大模型
async function streamAIChat(apiUrl, apiKey, messages) {
  const response = await fetch(apiUrl, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${apiKey}`,
      'Accept': 'text/event-stream',
    },
    body: JSON.stringify({
      model: 'deepseek-chat',
      messages,
      stream: true,  // 关键参数:启用流式
    }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = '';
  let fullContent = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';

    for (const line of lines) {
      const trimmed = line.trim();

      // 跳过空行和非 data 行
      if (!trimmed || !trimmed.startsWith('data:')) continue;

      const dataStr = trimmed.slice(5).trim();

      // 流结束标记
      if (dataStr === '[DONE]') {
        return fullContent;
      }

      try {
        const parsed = JSON.parse(dataStr);
        const delta = parsed.choices?.[0]?.delta?.content || '';
        fullContent += delta;

        // 实时渲染到页面
        yield { content: delta, done: false };
      } catch (e) {
        // 忽略无法解析的行(可能是心跳或格式错误)
        console.warn('SSE 解析跳过:', dataStr);
      }
    }
  }

  return fullContent;
}

// 在 Vue/React 中使用
async function handleChat() {
  const stream = streamAIChat(
    'https://api.deepseek.com/v1/chat/completions',
    'sk-your-key',
    [{ role: 'user', content: '用 JavaScript 实现一个 LRU 缓存' }]
  );

  for await (const chunk of stream) {
    // 逐 token 渲染到 UI
    outputElement.textContent += chunk.content;
  }
}

⚠️ **警告:**AI 流式 API 中,[DONE] 标记是 OpenAI 规范的约定,但不是所有厂商都遵循。Claude API 使用 message_stop 事件类型,国产大模型各有差异。务必针对每个 API 文档做兼容处理。

📋 性能对比:SSE vs WebSocket vs 长轮询

在 AI 流式场景中选择传输协议,直接影响用户体验和服务器成本:

维度 SSE WebSocket 长轮询
连接建立开销 HTTP 1 次握手 HTTP + 升级握手 每次 HTTP 握手
消息延迟 50-100ms 10-50ms 500ms-2s
服务器内存/连接 低(单向) 中(双向) 高(反复创建)
自动重连 ✅ 内置 ❌ 需手动实现 ✅ 天然支持
代理/CDN 兼容 ✅ 完美(标准 HTTP) ⚠️ 需配置 ✅ 完美
浏览器并发限制 ⚠️ 6 个域名限制 无限制 无限制
适合场景 服务器→客户端单向推送 双向实时通信 兼容性兜底方案

⚡ **关键结论:**对于 AI 流式输出这种「服务器单向推送」的场景,SSE 是最优解。它基于标准 HTTP,不需要特殊的代理配置,天然支持 CDN 和负载均衡,且代码复杂度远低于 WebSocket。

📋 后端代理模式:统一转发 LLM 流

实际项目中,前端不会直连 LLM API,而是通过自己的后端做代理。下面是一个 Express + SSE 的完整代理实现:

// 后端代理 LLM 流式响应,同时做 Token 计量和日志
const express = require('express');
const app = express();
app.use(express.json());

app.post('/api/chat/stream', async (req, res) => {
  const { messages, model = 'deepseek-chat' } = req.body;
  const userId = req.user?.id; // 从 JWT 中获取

  // 设置 SSE 响应头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no',
  });

  let totalTokens = 0;
  let fullResponse = '';

  try {
    const llmResponse = await fetch('https://api.deepseek.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${process.env.LLM_API_KEY}`,
      },
      body: JSON.stringify({ model, messages, stream: true }),
    });

    const reader = llmResponse.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    // 客户端断开时清理上游连接
    let aborted = false;
    req.on('close', () => { aborted = true; reader.cancel(); });

    while (!aborted) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop() || '';

      for (const line of lines) {
        const trimmed = line.trim();
        if (!trimmed.startsWith('data:')) continue;

        // 直接透传 LLM 的 SSE 数据给前端
        res.write(trimmed + '\n\n');

        // 同时解析内容做计量
        const dataStr = trimmed.slice(5).trim();
        if (dataStr !== '[DONE]') {
          try {
            const parsed = JSON.parse(dataStr);
            const content = parsed.choices?.[0]?.delta?.content || '';
            fullResponse += content;
            totalTokens += parsed.usage?.total_tokens || 0;
          } catch (e) { /* 心跳等非 JSON 行 */ }
        }
      }
    }

    // 流结束后记录日志和用量
    console.log(`用户 ${userId} | 模型 ${model} | tokens: ${totalTokens}`);
    await saveUsageLog(userId, model, totalTokens, fullResponse);

  } catch (err) {
    console.error('LLM 代理错误:', err);
    res.write(`event: error\ndata: ${JSON.stringify({ error: '服务暂时不可用' })}\n\n`);
  }

  res.end();
});

💡 **提示:**上游 LLM API 超时通常为 120 秒。如果用户在流式过程中断开连接(req.on('close')),务必立即取消上游的 reader,避免浪费 API 额度。

🛡️ 三、生产环境避坑指南

📋 坑点 1:Nginx 缓冲导致消息延迟

这是 SSE 部署中最常见也最隐蔽的问题。Nginx 默认会缓冲 upstream 响应,导致所有 SSE 消息被攒到一起发送,客户端体验为「卡了半天突然一大段」。

# Nginx 反向代理 SSE 的正确配置
location /api/chat/stream {
    proxy_pass http://backend;

    # ✅ 关键配置:禁用代理缓冲
    proxy_buffering off;

    # ✅ 关键配置:禁用 chunked 传输编码转换
    proxy_http_version 1.1;
    proxy_set_header Connection '';

    # ✅ 设置较长的超时(AI 流式可能持续 2 分钟+)
    proxy_read_timeout 300s;
    proxy_send_timeout 300s;

    # ✅ 传递客户端真实 IP 和 Last-Event-ID
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header Last-Event-ID $http_last_event_id;
}

📋 坑点 2:浏览器 6 连接限制

HTTP/1.1 规范限制浏览器对同一域名最多 6 个并发连接。如果一个 SSE 连接长期占用,其他页面请求可能被阻塞。

场景 影响 解决方案
用户打开多个标签页 第 7 个标签页 SSE 连不上 使用 BroadcastChannel 共享连接,或升级 HTTP/2
SSE 长连接期间发 XHR 请求排队等待 为 SSE 使用独立子域名(如 sse.example.com
HTTP/2 环境 无限制 HTTP/2 多路复用消除了 6 连接限制
// 用 BroadcastChannel 在多标签页间共享一个 SSE 连接
const channel = new BroadcastChannel('sse-shared');
let isLeader = false;
const lock = new BroadcastChannel('sse-lock');

// 简单的主标签页选举
navigator.locks?.request('sse-connection', async () => {
  isLeader = true;
  const sse = new SSEClient('/events');
  sse.on('message', (e) => {
    // 主标签页收到消息后广播给其他标签页
    channel.postMessage(JSON.parse(e.data));
  });
  sse.connect();
  // 持有锁直到标签页关闭
  await new Promise(() => {});
});

// 非主标签页通过 BroadcastChannel 接收
channel.onmessage = (e) => {
  handleSSEMessage(e.data);
};

📋 坑点 3:错误处理与优雅降级

SSE 连接可能因网络波动、服务重启、负载均衡器超时等原因断开。生产环境必须实现完善的错误处理链路:

// 生产级错误处理策略
class RobustSSEClient {
  // 三级降级策略
  async connectWithFallback() {
    // 第一级:尝试 SSE(最优体验)
    try {
      await this.connectSSE();
      return 'sse';
    } catch (e) {
      console.warn('SSE 不可用,降级到长轮询');
    }

    // 第二级:降级到长轮询(兼容性最好)
    try {
      await this.connectLongPolling();
      return 'long-polling';
    } catch (e) {
      console.warn('长轮询不可用,降级到短轮询');
    }

    // 第三级:降级到短轮询(最保底)
    this.connectShortPolling();
    return 'short-polling';
  }

  // SSE 连接(带指数退避重试)
  async connectSSE() {
    const MAX_RETRY = 5;
    for (let i = 0; i < MAX_RETRY; i++) {
      try {
        const client = new SSEClient(this.url, {
          headers: this.headers,
          timeout: 60000,
        });
        await client.connect();
        return client;
      } catch (err) {
        // 指数退避:1s, 2s, 4s, 8s, 16s
        const delay = Math.min(1000 * Math.pow(2, i), 16000);
        // 添加随机抖动,避免雷群效应
        const jitter = delay * 0.5 * Math.random();
        await new Promise(r => setTimeout(r, delay + jitter));
      }
    }
    throw new Error('SSE 连接失败,已重试 5 次');
  }

  // 长轮询降级
  async connectLongPolling() {
    while (true) {
      const response = await fetch(`${this.url}/poll?timeout=30000`, {
        headers: this.headers,
      });
      const events = await response.json();
      events.forEach(event => this._emit('message', event));
    }
  }

  // 短轮询兜底
  connectShortPolling(interval = 5000) {
    this._pollTimer = setInterval(async () => {
      const response = await fetch(`${this.url}/snapshot`, {
        headers: this.headers,
      });
      const data = await response.json();
      this._emit('message', data);
    }, interval);
  }
}

⚠️ **警告:**永远不要在 SSE 连接断开后立即重连。无间隔重连会导致「雷群效应」——如果服务端重启,所有客户端同时重连可能瞬间打垮服务。务必使用指数退避(Exponential Backoff)加随机抖动(Jitter)。

📊 四、实战案例:构建实时 AI 对话组件

将前面的所有知识整合,构建一个完整的 Vue 3 流式对话组件:

<!-- StreamChat.vue — Vue 3 组合式 API 实现的流式对话组件 -->
<script setup>
import { ref, reactive, onUnmounted, nextTick } from 'vue'

const messages = reactive([])
const inputText = ref('')
const isStreaming = ref(false)
const connectionStatus = ref('disconnected') // disconnected | connecting | connected
const outputRef = ref(null)
let controller = null

async function sendMessage() {
  if (!inputText.value.trim() || isStreaming.value) return

  const userMsg = { role: 'user', content: inputText.value }
  messages.push(userMsg)
  inputText.value = ''
  isStreaming.value = true
  connectionStatus.value = 'connecting'

  // 创建 AI 回复占位
  const aiMsg = reactive({ role: 'assistant', content: '', status: 'streaming' })
  messages.push(aiMsg)

  controller = new AbortController()

  try {
    const response = await fetch('/api/chat/stream', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ messages: messages.slice(0, -1) }),
      signal: controller.signal,
    })

    connectionStatus.value = 'connected'
    const reader = response.body.getReader()
    const decoder = new TextDecoder()
    let buffer = ''

    while (true) {
      const { done, value } = await reader.read()
      if (done) break

      buffer += decoder.decode(value, { stream: true })
      const lines = buffer.split('\n')
      buffer = lines.pop() || ''

      for (const line of lines) {
        if (!line.trim().startsWith('data:')) continue
        const data = line.trim().slice(5).trim()
        if (data === '[DONE]') continue

        try {
          const parsed = JSON.parse(data)
          const delta = parsed.choices?.[0]?.delta?.content || ''
          aiMsg.content += delta
          // 自动滚动到底部
          await nextTick()
          outputRef.value?.scrollTo({ top: outputRef.value.scrollHeight })
        } catch (e) { /* 忽略 */ }
      }
    }

    aiMsg.status = 'done'
  } catch (err) {
    if (err.name !== 'AbortError') {
      aiMsg.content = `❌ 请求失败: ${err.message}`
      aiMsg.status = 'error'
    }
  } finally {
    isStreaming.value = false
    connectionStatus.value = 'disconnected'
  }
}

function stopStreaming() {
  controller?.abort()
  isStreaming.value = false
}

onUnmounted(() => controller?.abort())
</script>

<template>
  <div class="chat-container">
    <div class="status-bar">
      <span :class="connectionStatus">
        {{ { disconnected: '⚪ 未连接', connecting: '🟡 连接中...', connected: '🟢 已连接' }[connectionStatus] }}
      </span>
    </div>
    <div class="messages" ref="outputRef">
      <div v-for="(msg, i) in messages" :key="i" :class="msg.role">
        <pre>{{ msg.content }}<span v-if="msg.status === 'streaming'" class="cursor">▋</span></pre>
      </div>
    </div>
    <form @submit.prevent="sendMessage" class="input-area">
      <input v-model="inputText" placeholder="输入消息..." :disabled="isStreaming" />
      <button v-if="!isStreaming" type="submit">发送</button>
      <button v-else type="button" @click="stopStreaming">停止</button>
    </form>
  </div>
</template>

💡 五、最佳实践总结

经过大量生产实践,以下是 SSE 开发的核心 checklist:

✅ 推荐做法:

  • ✅ 始终设置 X-Accel-Buffering: no 响应头,防止 Nginx/CDN 缓冲
  • ✅ 实现心跳机制(每 30 秒发送注释行 : heartbeat),防止代理超时断连
  • ✅ 使用 Last-Event-ID 实现断线续传,不丢失任何事件
  • ✅ 采用指数退避 + 随机抖动的重连策略
  • ✅ 为 SSE 连接设置独立子域名,避免阻塞同域 HTTP 请求
  • ✅ 上游客户端断开时,立即取消下游连接,节省资源

❌ 避免做法:

  • ❌ 使用原生 EventSource 连接需要认证的 API(不支持自定义 Header)
  • ❌ 断线后立即重连(会导致雷群效应)
  • ❌ 在 Nginx 中使用默认的 proxy_buffering on(消息延迟严重)
  • ❌ 忽略 HTTP/2 环境下 6 连接限制已解除的事实

🔧 SSE 最佳搭档工具:

工具 用途 推荐指数
jsjson.com JSON 格式化 在线格式化 SSE 返回的 JSON 数据 ⭐⭐⭐⭐⭐
EventSource Polyfill 兼容旧浏览器 + 支持自定义 Header ⭐⭐⭐⭐
SSE.js (npm) 轻量级 Node.js SSE 客户端库 ⭐⭐⭐⭐
Vercel AI SDK 开箱即用的 AI 流式 UI 组件 ⭐⭐⭐⭐⭐

SSE 不是什么新技术,但在 AI 流式输出成为标配的 2026 年,它焕发了新生。相比 WebSocket 的重量级双向通道,SSE 以其极简的协议、原生的重连机制、标准 HTTP 的兼容性,成为了服务器单向推送场景的最优解。掌握本文的实战模式,你就具备了构建任何实时流式应用的能力。

📚 相关文章