Server-Sent Events 深度实战:构建生产级实时推送系统

从 SSE 原理到生产落地,详解浏览器原生 EventSource API、Node.js 服务端实现、与 LLM 流式输出集成,以及自动重连、心跳保活、多客户端广播等核心模式。

前端开发 2026-05-28 15 分钟

2026 年,随着 LLM 应用的爆发式增长,Server-Sent Events(SSE)从一个被低估的浏览器 API 一跃成为最热门的实时通信方案。OpenAI、Anthropic、Google 的 LLM API 全部采用 SSE 实现流式输出,GitHub Copilot 的实时补全也基于 SSE。与 WebSocket 需要自定义协议、双向握手不同,SSE 只需一个普通的 HTTP 响应流,天然兼容 CDN、反向代理和 Serverless 平台。如果你的场景是「服务端向客户端单向推送数据」,SSE 是 2026 年最优选择——它的复杂度只有 WebSocket 的 1/3,可靠性却更高。

📡 一、SSE 核心原理与 EventSource API

1.1 SSE 协议本质

SSE 不是一个独立协议,而是一种基于 HTTP 的流式数据格式。服务端返回 Content-Type: text/event-stream 响应头,然后持续写入以 \n\n 分隔的文本块。每个事件块包含以下字段:

event: message       # 事件类型(可选,默认 message)
id: 123              # 事件 ID(用于断线重连)
retry: 3000          # 重连间隔,毫秒(浏览器自动使用)
data: {"key":"val"}  # 事件数据(必须字段)

💡 **提示:**SSE 的数据格式是纯文本,不支持二进制传输。如果你需要推送二进制数据(如图片、音频),请使用 WebSocket。SSE 的设计哲学是「简单即可靠」——HTTP 是互联网上最稳定的传输层。

1.2 浏览器端 EventSource API

浏览器原生的 EventSource API 提供了自动重连、事件类型解析和 ID 追踪等开箱即用的功能:

// 浏览器端 SSE 客户端 — 完整示例
const evtSource = new EventSource('/api/events', {
  withCredentials: true  // 跨域时携带 Cookie
})

// 监听默认 message 事件
evtSource.onmessage = (event) => {
  const data = JSON.parse(event.data)
  console.log('收到消息:', data)
  updateUI(data)
}

// 监听自定义事件类型
evtSource.addEventListener('heartbeat', (event) => {
  console.log('心跳:', event.data)
})

evtSource.addEventListener('alert', (event) => {
  const alert = JSON.parse(event.data)
  showAlert(alert.level, alert.message)
})

// 错误处理与连接状态
evtSource.onerror = (event) => {
  if (evtSource.readyState === EventSource.CONNECTING) {
    console.log('连接断开,正在重连...')
  } else if (evtSource.readyState === EventSource.CLOSED) {
    console.log('连接已关闭')
  }
}

// 手动关闭连接
// evtSource.close()

1.3 SSE vs WebSocket vs Long Polling 对比

维度 SSE WebSocket Long Polling
通信方向 服务端 → 客户端(单向) 双向 服务端 → 客户端(模拟)
协议 HTTP/1.1 或 HTTP/2 WebSocket (RFC 6455) HTTP
自动重连 ✅ 浏览器内置 ❌ 需手动实现 ❌ 需手动实现
事件 ID 追踪 ✅ 内置 ❌ 需自定义 ❌ 需自定义
二进制支持 ❌ 仅文本 ❌ 仅文本
CDN/代理兼容 ✅ 天然兼容 ⚠️ 可能被拦截 ✅ 兼容
Serverless 支持 ✅(部分平台) ❌ 需要长连接 ⚠️ 超时问题
连接开销 低(单个 HTTP 连接) 中(握手开销) 高(频繁建立连接)
浏览器支持 所有现代浏览器 所有现代浏览器 所有浏览器

⚡ **关键结论:**如果你的场景是「服务端单向推送」(如通知、日志、LLM 流式输出、股票行情),SSE 是最佳选择。只有在需要客户端向服务端频繁发送数据时(如聊天、协同编辑),才需要 WebSocket。

🔧 二、Node.js 服务端 SSE 实现

2.1 原生 HTTP 模块实现

不依赖任何框架,用 Node.js 原生 API 实现一个生产级 SSE 服务端:

// server-native.js — Node.js 原生 SSE 服务端
import http from 'node:http'

// 客户端连接池
const clients = new Set()

function sseHandler(req, res) {
  // 设置 SSE 响应头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',       // 禁止 CDN 缓存
    'Connection': 'keep-alive',         // 保持长连接
    'X-Accel-Buffering': 'no',          // 禁用 Nginx 缓冲
    'Access-Control-Allow-Origin': '*'  // CORS
  })

  // 注册客户端
  clients.add(res)
  console.log(`客户端连接,当前 ${clients.size} 个`)

  // 发送初始连接确认
  res.write('event: connected\ndata: {"status":"ok"}\n\n')

  // 心跳保活:每 15 秒发送一次
  const heartbeat = setInterval(() => {
    res.write(': heartbeat\n\n')  // 注释行(不会触发事件)
  }, 15000)

  // 客户端断开时清理
  req.on('close', () => {
    clearInterval(heartbeat)
    clients.delete(res)
    console.log(`客户端断开,剩余 ${clients.size} 个`)
  })
}

// 广播函数:向所有客户端推送消息
function broadcast(event, data) {
  const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`
  for (const client of clients) {
    client.write(message)
  }
}

// 启动服务
const server = http.createServer((req, res) => {
  if (req.url === '/events' && req.method === 'GET') {
    sseHandler(req, res)
  } else {
    res.writeHead(404)
    res.end('Not Found')
  }
})

// 模拟业务推送
setInterval(() => {
  broadcast('update', {
    timestamp: Date.now(),
    online: clients.size
  })
}, 5000)

server.listen(3000, () => console.log('SSE 服务运行在 :3000/events'))

⚠️ **警告:**生产环境必须设置 X-Accel-Buffering: no,否则 Nginx 会缓冲整个响应再一次性发送,导致客户端收不到实时推送。这是 SSE 在生产环境最常见的「坑」。

2.2 Hono 框架实现

使用 Hono 框架(轻量、TypeScript 友好)实现 SSE,代码更简洁:

// server-hono.ts — Hono SSE 服务端
import { Hono } from 'hono'
import { streamSSE } from 'hono/streaming'

const app = new Hono()
const clients = new Set<string>()

app.get('/events', (c) => {
  return streamSSE(c, async (stream) => {
    const clientId = crypto.randomUUID()
    clients.add(clientId)
    
    // 发送连接确认
    await stream.writeSSE({
      event: 'connected',
      data: JSON.stringify({ clientId }),
      id: '0'
    })

    let id = 1
    const heartbeat = setInterval(async () => {
      try {
        await stream.writeSSE({ data: '', comment: 'heartbeat' })
      } catch {
        clearInterval(heartbeat)
      }
    }, 15000)

    // 监听取消事件(客户端断开)
    stream.onAbort(() => {
      clearInterval(heartbeat)
      clients.delete(clientId)
    })

    // 保持连接打开,等待业务事件推送
    await new Promise<void>((resolve) => {
      stream.onAbort(resolve)
    })
  })
})

export default app

2.3 带重连与事件 ID 追踪的完整方案

生产级 SSE 需要解决一个关键问题:断线重连后如何补发丢失的事件? 答案是使用事件 ID + 内存环形缓冲区:

// sse-with-resumability.ts — 可恢复的 SSE 实现
import { Hono } from 'hono'
import { streamSSE } from 'hono/streaming'

interface SSEEvent {
  id: string
  event: string
  data: string
  timestamp: number
}

// 环形缓冲区:保留最近 1000 条事件
class EventBuffer {
  private buffer: SSEEvent[] = []
  private idCounter = 0
  private readonly maxSize: number

  constructor(maxSize = 1000) {
    this.maxSize = maxSize
  }

  push(event: string, data: string): SSEEvent {
    const entry: SSEEvent = {
      id: String(++this.idCounter),
      event,
      data,
      timestamp: Date.now()
    }
    this.buffer.push(entry)
    if (this.buffer.length > this.maxSize) {
      this.buffer.shift()
    }
    return entry
  }

  // 获取指定 ID 之后的所有事件(用于重连补发)
  getAfter(lastId: string): SSEEvent[] {
    const idx = this.buffer.findIndex((e) => e.id === lastId)
    if (idx === -1) return this.buffer.slice(-50) // 找不到则返回最近 50 条
    return this.buffer.slice(idx + 1)
  }
}

const eventBuffer = new EventBuffer()

const app = new Hono()

app.get('/events', (c) => {
  // 从 URL 参数获取上次断线时的事件 ID
  const lastEventId = c.req.query('lastEventId') || c.req.header('Last-Event-ID') || '0'

  return streamSSE(c, async (stream) => {
    // 补发断线期间错过的事件
    const missedEvents = eventBuffer.getAfter(lastEventId)
    for (const evt of missedEvents) {
      await stream.writeSSE({
        id: evt.id,
        event: evt.event,
        data: evt.data
      })
    }

    // 监听新事件
    await new Promise<void>((resolve) => {
      stream.onAbort(resolve)
    })
  })
})

// 推送事件时写入缓冲区
function pushEvent(event: string, data: string) {
  eventBuffer.push(event, data)
}

export { pushEvent }

📌 记住:EventSource 重连时会自动携带 Last-Event-ID 请求头。你只需在服务端读取这个头,补发之后的事件即可实现「无缝续传」。这是 SSE 区别于 WebSocket 的核心优势之一。

🚀 三、实战场景与最佳实践

3.1 LLM 流式输出集成

SSE 是 LLM 流式输出的事实标准。OpenAI 的 Chat Completions API 就使用 SSE 逐 token 返回:

// llm-streaming.ts — 前端接收 LLM 流式输出
const response = await fetch('/api/chat', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({
    messages: [{ role: 'user', content: '解释量子计算' }]
  })
})

const reader = response.body!.getReader()
const decoder = new TextDecoder()
let fullText = ''

while (true) {
  const { done, value } = await reader.read()
  if (done) break

  // SSE 格式解析:按行处理
  const chunk = decoder.decode(value, { stream: true })
  const lines = chunk.split('\n')

  for (const line of lines) {
    if (!line.startsWith('data: ')) continue
    const data = line.slice(6)  // 去掉 "data: " 前缀

    if (data === '[DONE]') {
      console.log('流式输出完成')
      break
    }

    try {
      const parsed = JSON.parse(data)
      const token = parsed.choices?.[0]?.delta?.content || ''
      fullText += token
      // 实时更新 UI
      renderToken(token)
    } catch {
      // 忽略不完整的 JSON(跨 chunk 拼接问题)
    }
  }
}

function renderToken(token: string) {
  const output = document.getElementById('output')!
  output.textContent += token
  output.scrollTop = output.scrollHeight  // 自动滚动到底部
}

3.2 进度推送与任务监控

长时间运行的任务(如文件处理、数据导出)可以通过 SSE 实时推送进度:

// task-progress.ts — 服务端推送任务进度
import { Hono } from 'hono'
import { streamSSE } from 'hono/streaming'

const app = new Hono()

app.post('/export', async (c) => {
  const taskId = crypto.randomUUID()
  
  // 异步启动任务
  runExportTask(taskId)
  
  return c.json({ taskId })
})

app.get('/export/:taskId/progress', (c) => {
  const taskId = c.req.param('taskId')
  
  return streamSSE(c, async (stream) => {
    // 轮询任务进度(生产环境建议用 pub/sub)
    const check = setInterval(async () => {
      const progress = getTaskProgress(taskId)
      
      await stream.writeSSE({
        event: progress.status,
        id: String(progress.step),
        data: JSON.stringify({
          step: progress.step,
          total: progress.total,
          percent: Math.round((progress.step / progress.total) * 100),
          message: progress.message
        })
      })

      if (progress.status === 'completed' || progress.status === 'failed') {
        clearInterval(check)
        stream.close()
      }
    }, 500)

    stream.onAbort(() => clearInterval(check))
  })
})

3.3 生产环境部署清单

配置项 推荐值 说明
心跳间隔 15-30 秒 防止代理/防火墙超时断连
Nginx proxy_buffering off 必须关闭,否则 SSE 不生效
Nginx proxy_read_timeout 86400 长连接超时设为 24 小时
连接数限制 6 个/域名 浏览器对同域 HTTP/1.1 有 6 连接限制
事件缓冲区大小 500-1000 条 平衡内存与重连覆盖范围
gzip 压缩 开启 SSE 文本数据压缩率很高

Nginx 关键配置:

# nginx.conf — SSE 反向代理配置
location /events {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_buffering off;              # 🔴 必须关闭
    proxy_cache off;                  # 🔴 禁止缓存
    proxy_read_timeout 86400s;        # 长连接超时 24 小时
    chunked_transfer_encoding on;
}

⚠️ **警告:**浏览器对同一域名的 HTTP/1.1 连接数限制为 6 个。如果你的页面同时打开多个 SSE 连接,会阻塞其他 HTTP 请求。解决方案:使用独立域名(如 sse.example.com)托管 SSE 端点,或使用 HTTP/2(单连接多路复用)。

3.4 常见坑点与避坑指南

❌ 坑点 1:忘记设置 Cache-Control: no-cache

CDN 会缓存 SSE 响应,导致所有客户端收到的是同一份缓存数据,而非实时推送。

**✅ 正确做法:**在响应头中明确禁止缓存,并加上 X-Accel-Buffering: no 禁用 Nginx 缓冲。

❌ 坑点 2:JSON 数据中包含换行符

SSE 用 \n\n 分隔事件。如果你的 JSON 数据包含换行符,会破坏事件边界。

**✅ 正确做法:**多行数据用 data: 前缀逐行发送,接收端自动拼接:

data: {"message":"第一行\n
data: 第二行"}
// 接收端会自动将多个 data: 行拼接,中间用 \n 连接
evtSource.onmessage = (e) => {
  // e.data === '{"message":"第一行\n第二行"}'
  console.log(JSON.parse(e.data))
}

❌ 坑点 3:Node.js 未正确处理 req.on('close')

客户端断开后,服务端继续向已关闭的 socket 写入数据,最终导致内存泄漏和 ERR_STREAM_WRITE_AFTER_END 错误。

**✅ 正确做法:**始终监听 close 事件并清理资源,使用 try-catch 包裹写操作。

💡 四、SSE 的局限与替代方案

SSE 并非万能。以下场景应选择 WebSocket:

  1. 双向通信:聊天室、协同编辑——客户端需要频繁发送数据
  2. 二进制传输:音视频流、文件传输——SSE 只支持文本
  3. 极低延迟:游戏、实时竞价——WebSocket 的帧开销比 SSE 小
  4. 大量并发连接:WebSocket 的内存占用比 SSE 低(无需 HTTP 头部开销)

但对于以下场景,SSE 是明确的最优解:

  1. LLM 流式输出:逐 token 返回,天然单向
  2. 实时通知/告警:服务端推送给客户端
  3. 数据看板/监控:实时更新指标
  4. 进度条/任务状态:长任务的进度反馈
  5. Serverless 平台:Vercel、Cloudflare Workers 原生支持 SSE

⚡ **关键结论:**SSE 不是 WebSocket 的「低配版」,而是针对「服务端推送」场景的最优解。它的自动重连、事件 ID 追踪、HTTP 兼容性,让它在生产环境中比 WebSocket 更可靠、更易维护。

✅ 总结

SSE 是 2026 年最被低估的实时通信技术。随着 LLM 应用的爆发,SSE 已经从「小众 API」变成了每个开发者的必备技能。它的核心优势在于:用最简单的协议(HTTP)实现最可靠的实时推送

如果你正在构建以下应用,立刻用 SSE 替换你的轮询逻辑:

  • LLM 对话界面 → SSE 流式输出
  • 后台管理看板 → SSE 实时指标
  • 文件上传/处理 → SSE 进度推送
  • 系统告警通知 → SSE 事件推送

相关工具推荐:

📚 相关文章