Server-Sent Events 实战:LLM 时代最被低估的实时通信方案

深入解析 SSE 协议原理与实战,对比 WebSocket 选型策略,覆盖 Node.js/Go/Java 多语言实现,包含 LLM 流式输出、实时推送等真实场景代码。

前端开发 2026-05-29 12 分钟

几乎所有主流 LLM 服务——ChatGPT、Claude、Gemini——的流式输出都基于同一个协议:Server-Sent Events(SSE)。这个诞生于 2004 年、被写入 HTML5 规范的协议,在 AI 时代迎来了第二春。然而多数开发者对 SSE 的理解仍停留在"比 WebSocket 简单"的层面,对它的断线重连机制、HTTP/2 多路复用优势、以及在高并发场景下的陷阱知之甚少。本文将从协议原理到生产实战,彻底讲透 SSE。

🔌 一、SSE 协议原理与核心机制

SSE 的本质是 HTTP 单向流:客户端发起普通 HTTP 请求,服务端不返回完整响应,而是持续发送 text/event-stream 格式的数据块,直到连接关闭。浏览器原生提供 EventSource API 来消费这种流。

📡 1.1 协议格式详解

SSE 的数据格式极其简洁,每条事件由以下字段组成:

id: 42
event: message
retry: 5000
data: {"content": "Hello, World!"}
data: {"continued": true}

  • id:事件 ID,断线重连时通过 Last-Event-Id 头告知服务端
  • event:事件类型,默认为 message
  • retry:建议客户端的重连间隔(毫秒)
  • data:事件数据,支持多行(每行以 data: 开头)
  • 空行\n\n):标志一条事件结束

💡 提示:data 字段支持多行,但每行都必须以 data: 前缀。这是初学者最容易犯的错误——直接写多行文本会导致只有第一行被接收。

下面是一个原生 Node.js SSE 服务端的最小实现:

// server.js — 最小 SSE 服务端(Node.js 原生 HTTP)
import http from 'node:http';

const server = http.createServer((req, res) => {
  if (req.url !== '/events') {
    res.writeHead(200, { 'Content-Type': 'text/html' });
    res.end('<h1>SSE Server Running</h1>');
    return;
  }

  // SSE 响应头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Access-Control-Allow-Origin': '*',
  });

  let id = 0;
  const interval = setInterval(() => {
    id++;
    const data = JSON.stringify({ time: new Date().toISOString(), id });
    // 严格遵循 SSE 格式:id + data + 空行
    res.write(`id: ${id}\n`);
    res.write(`data: ${data}\n\n`);
  }, 1000);

  // 客户端断开时清理
  req.on('close', () => {
    clearInterval(interval);
    res.end();
  });
});

server.listen(3000, () => console.log('SSE on http://localhost:3000'));

对应的客户端消费代码:

<!-- client.html — EventSource 客户端 -->
<script>
const es = new EventSource('/events');

// 监听默认 message 事件
es.onmessage = (e) => {
  const data = JSON.parse(e.data);
  console.log(`[${data.id}] ${data.time}`);
};

// 浏览器自动处理断线重连
es.onerror = (e) => {
  console.warn('连接异常,浏览器将自动重连...');
};

// 自定义事件类型
es.addEventListener('alert', (e) => {
  console.log('收到告警:', e.data);
});
</script>

🔄 1.2 断线重连:SSE 的杀手级特性

这是 SSE 相比 WebSocket 最大的优势之一。EventSource 内置了**自动重连 + 事件溯源(Event Sourcing)**机制:

  1. 连接断开后,浏览器自动按 retry 字段指定的间隔重连
  2. 重连请求自动携带 Last-Event-Id
  3. 服务端根据该 ID 返回断线期间的所有事件,不丢数据
// 服务端:利用 Last-Event-Id 实现断线续传
const server = http.createServer((req, res) => {
  if (req.url !== '/events') return;

  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  });

  // 从客户端断线位置恢复
  const lastId = parseInt(req.headers['last-event-id'] || '0', 10);
  let currentId = lastId;

  // 先补发断线期间的事件
  for (let i = lastId + 1; i <= getLatestId(); i++) {
    const data = getEventById(i); // 从缓存/数据库获取
    res.write(`id: ${i}\ndata: ${JSON.stringify(data)}\n\n`);
  }

  // 继续发送新事件
  const interval = setInterval(() => {
    currentId++;
    const data = { id: currentId, content: 'new event' };
    res.write(`id: ${currentId}\ndata: ${JSON.stringify(data)}\n\n`);
  }, 1000);

  req.on('close', () => clearInterval(interval));
});

⚠️ **警告:**如果你不设置 id 字段,断线重连时客户端无法告知服务端"上次收到哪条",重连后会从头开始接收,导致重复数据。在生产环境中,每条事件必须有唯一且递增的 id

⚖️ 二、SSE vs WebSocket 深度对比

"该用 SSE 还是 WebSocket?“是我被问得最多的架构选型问题之一。答案取决于你的场景,而不是哪个"更先进”。

📊 2.1 全维度对比

维度 SSE WebSocket
通信方向 单向(服务端→客户端) 全双工
协议 HTTP/1.1 或 HTTP/2 ws:// 或 wss://
自动重连 ✅ 浏览器原生支持 ❌ 需手动实现
事件溯源 ✅ Last-Event-Id ❌ 需手动实现
负载均衡 ✅ 标准 HTTP,LB 无感知 ⚠️ 需要支持 WebSocket 的 LB
代理/CDN 兼容 ✅ 完美兼容 ⚠️ 部分代理不支持 Upgrade
二进制数据 ❌ 仅文本 ✅ 支持二进制帧
跨域 ✅ 标准 CORS ✅ 自带 Origin 检查
连接数限制 6/域名(HTTP/1.1),HTTP/2 无限制 无同源限制
实现复杂度 ⭐ 极低 ⭐⭐⭐ 中等

🎯 2.2 选型决策树

我的经验法则:

选 SSE 的场景:

  • ✅ LLM/AI 流式输出(token by token)
  • ✅ 实时通知、消息推送
  • ✅ 实时数据看板(股票行情、监控指标)
  • ✅ 进度条更新(文件上传、任务执行)
  • ✅ 需要经过企业代理/防火墙的场景

选 WebSocket 的场景:

  • ✅ 在线聊天(双向实时)
  • ✅ 协同编辑(如 Google Docs)
  • ✅ 实时游戏(低延迟双向通信)
  • ✅ 需要传输二进制数据(音视频流)

📌 **记住:**如果你只需要服务端向客户端推送数据,SSE 几乎总是更好的选择。它更简单、更可靠、更符合 HTTP 生态。不要因为 WebSocket "更酷"就盲目选择。

🚀 2.3 HTTP/2 下的 SSE 优势

在 HTTP/1.1 下,SSE 有一个广为人知的限制:每个域名最多 6 个并发连接(浏览器限制)。但在 HTTP/2 下,这个问题完全消失——所有请求共享一个 TCP 连接,通过流(Stream)多路复用。

# Nginx 配置:启用 HTTP/2 + SSE
server {
    listen 443 ssl http2;
    server_name api.example.com;

    ssl_certificate /path/to/cert.pem;
    ssl_certificate_key /path/to/key.pem;

    location /events {
        proxy_pass http://backend:3000;
        proxy_http_version 1.1;
        proxy_set_header Connection '';
        proxy_buffering off;           # 关键!禁用缓冲
        proxy_cache off;               # 禁用缓存
        chunked_transfer_encoding off; # 禁用分块编码
        proxy_read_timeout 86400s;     # 长连接超时 24 小时
    }
}

⚠️ **警告:**Nginx 默认会缓冲响应(proxy_buffering),这会导致 SSE 事件延迟到达客户端。必须设置 proxy_buffering off。这是 SSE 部署中最常见的坑。

🛠️ 三、多语言生产级实现

🟢 3.1 Node.js + Express:LLM 流式输出

这是最贴近实际开发的场景——对接 OpenAI API,将 LLM 的流式输出转发给前端:

// llm-stream.js — LLM 流式输出代理(Node.js + Express)
import express from 'express';

const app = express();
app.use(express.json());

app.post('/api/chat', async (req, res) => {
  const { message } = req.body;

  // SSE 响应头
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  });

  try {
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
      },
      body: JSON.stringify({
        model: 'gpt-4o',
        messages: [{ role: 'user', content: message }],
        stream: true, // 开启流式输出
      }),
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop(); // 保留不完整的行

      for (const line of lines) {
        if (!line.startsWith('data: ')) continue;
        const data = line.slice(6);
        if (data === '[DONE]') {
          res.write('data: [DONE]\n\n');
          break;
        }

        try {
          const parsed = JSON.parse(data);
          const token = parsed.choices?.[0]?.delta?.content;
          if (token) {
            // 转发给前端客户端
            res.write(`data: ${JSON.stringify({ token })}\n\n`);
          }
        } catch {
          // 忽略解析错误
        }
      }
    }
  } catch (error) {
    res.write(`event: error\ndata: ${JSON.stringify({ message: error.message })}\n\n`);
  } finally {
    res.end();
  }
});

app.listen(3000);

前端消费:

// chat-client.js — 前端 LLM 流式消费
async function sendMessage(message) {
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let fullText = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    const lines = chunk.split('\n');

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const data = line.slice(6).trim();
      if (data === '[DONE]') return fullText;

      try {
        const { token } = JSON.parse(data);
        fullText += token;
        // 实时更新 UI
        document.getElementById('output').textContent = fullText;
      } catch {}
    }
  }

  return fullText;
}

💡 **提示:**注意上面的前端代码没有使用 EventSource,而是用 fetch + ReadableStream。原因是 EventSource 只支持 GET 请求,而 LLM API 通常需要 POST 请求体。这是 SSE 实战中最容易踩的坑之一。

🔵 3.2 Go:高并发 SSE 服务

Go 的 goroutine 天然适合 SSE 这种长连接场景。下面是一个支持多客户端广播的 SSE 服务:

// main.go — Go SSE 广播服务
package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "time"
)

// Event 结构体
type Event struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

// SSEBroker 管理所有客户端连接
type Broker struct {
    clients map[chan Event]bool
    mu      sync.RWMutex
    nextID  int
}

func NewBroker() *Broker {
    return &Broker{clients: make(map[chan Event]bool)}
}

func (b *Broker) Subscribe() chan Event {
    ch := make(chan Event, 64) // 带缓冲,防止慢消费者阻塞
    b.mu.Lock()
    b.clients[ch] = true
    b.mu.Unlock()
    return ch
}

func (b *Broker) Unsubscribe(ch chan Event) {
    b.mu.Lock()
    delete(b.clients, ch)
    b.mu.Unlock()
    close(ch)
}

func (b *Broker) Broadcast(data string) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    b.nextID++
    event := Event{ID: b.nextID, Data: data}
    for ch := range b.clients {
        select {
        case ch <- event:
        default:
            // 慢消费者,丢弃事件而非阻塞
            fmt.Printf("client %p is slow, dropping event %d\n", ch, event.ID)
        }
    }
}

func (b *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // SSE 响应头
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming unsupported", http.StatusInternalServerError)
        return
    }

    ch := b.Subscribe()
    defer b.Unsubscribe(ch)

    // 从 Last-Event-Id 恢复
    lastID := 0
    if id := r.Header.Get("Last-Event-Id"); id != "" {
        fmt.Sscanf(id, "%d", &lastID)
    }

    ctx := r.Context()
    for {
        select {
        case event := <-ch:
            if event.ID <= lastID {
                continue // 跳过已接收的事件
            }
            payload, _ := json.Marshal(event.Data)
            fmt.Fprintf(w, "id: %d\ndata: %s\n\n", event.ID, payload)
            flusher.Flush()
        case <-ctx.Done():
            return
        }
    }
}

func main() {
    broker := NewBroker()

    // 每 2 秒广播一次
    go func() {
        for {
            time.Sleep(2 * time.Second)
            msg := fmt.Sprintf("server time: %s", time.Now().Format(time.RFC3339))
            broker.Broadcast(msg)
        }
    }()

    http.Handle("/events", broker)
    http.ListenAndServe(":8080", nil)
}

☕ 3.3 Spring Boot:企业级 SSE 端点

// SseController.java — Spring Boot SSE 端点
package com.example.sse;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;

@RestController
@RequestMapping("/api")
public class SseController {

    private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter subscribe() {
        // 超时设为 0 表示不超时(生产环境建议设为合理值)
        SseEmitter emitter = new SseEmitter(0L);

        emitters.add(emitter);

        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> emitters.remove(emitter));
        emitter.onError(e -> emitters.remove(emitter));

        // 发送初始连接确认
        try {
            emitter.send(SseEmitter.event()
                .name("connected")
                .data("OK")
                .id(String.valueOf(System.currentTimeMillis())));
        } catch (IOException e) {
            emitters.remove(emitter);
        }

        return emitter;
    }

    @PostMapping("/broadcast")
    public void broadcast(@RequestBody String message) {
        for (SseEmitter emitter : emitters) {
            try {
                emitter.send(SseEmitter.event()
                    .name("message")
                    .data(message)
                    .id(String.valueOf(System.currentTimeMillis())));
            } catch (IOException e) {
                emitters.remove(emitter);
            }
        }
    }
}

⚠️ **警告:**Spring 的 SseEmitter 默认超时是 30 秒。如果不设置 new SseEmitter(0L),连接会在 30 秒后自动断开。此外,CopyOnWriteArrayList 在客户端数量多(>1000)时性能下降,生产环境应考虑使用 Redis Pub/Sub 做广播。

🏗️ 四、生产环境避坑指南

⚠️ 4.1 六大常见陷阱

经过在多个项目中踩坑总结,以下是 SSE 生产部署最容易出问题的地方:

陷阱 症状 解决方案
Nginx 代理缓冲 事件延迟 30 秒到达 proxy_buffering off
不设 ID 字段 断线重连丢数据或重复 每条事件必须有递增 id
EventSource 只支持 GET 无法发送请求体 改用 fetch + ReadableStream
HTTP/1.1 连接数限制 高并发下新连接被拒 升级 HTTP/2
慢消费者阻塞 服务端内存溢出 设置缓冲队列,丢弃慢消费者
心跳缺失 代理/LB 因空闲断开连接 每 15-30 秒发送 : ping\n\n

💓 4.2 心跳保活实现

// heartbeat.js — SSE 心跳保活
function setupSSE(req, res) {
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  });

  // 每 15 秒发送心跳(注释行,不会触发事件)
  const heartbeat = setInterval(() => {
    res.write(': ping\n\n'); // 以冒号开头的是注释,客户端忽略
  }, 15000);

  req.on('close', () => {
    clearInterval(heartbeat);
  });
}

📌 **记住:**SSE 注释行以冒号(:)开头,客户端的 EventSource 会忽略它,但 TCP 连接因此保持活跃。这是防止中间代理(Nginx、CloudFlare、AWS ALB)因空闲超时而断开连接的标准做法。

🔐 4.3 认证方案

SSE 不支持自定义请求头(EventSource 的 API 限制),认证方案有三种:

// 方案一:Cookie 认证(同域推荐)
const es = new EventSource('/events', { withCredentials: true });

// 方案二:URL 参数传递 Token(简单但有日志泄露风险)
const es = new EventSource(`/events?token=${accessToken}`);

// 方案三:先 POST 获取短生命周期 ticket,再用 ticket 建立 SSE
async function connectSSE() {
  // 1. POST 获取一次性 ticket
  const { ticket } = await fetch('/api/sse-ticket', {
    method: 'POST',
    headers: { 'Authorization': `Bearer ${token}` },
  }).then(r => r.json());

  // 2. 用 ticket 建立 SSE 连接
  const es = new EventSource(`/events?ticket=${ticket}`);
  return es;
}

⚡ **关键结论:**方案三(ticket 模式)是生产环境最安全的选择。Token 只在 POST 请求中传输(HTTPS 加密),不会出现在 URL 日志、浏览器历史或代理日志中。ticket 是一次性的,即使泄露也无法重放。

📝 总结

SSE 不是 WebSocket 的"低配版",而是一个为单向推送场景精心设计的协议。它的自动重连、事件溯源、HTTP 原生兼容性,在很多场景下比 WebSocket 更可靠、更简单。

我的建议:

  1. 新项目的 LLM 流式输出:首选 SSE + fetch ReadableStream
  2. 实时通知/看板:首选 SSE,除非需要双向通信
  3. 聊天/协同编辑:用 WebSocket
  4. 不确定时:从 SSE 开始,它更容易迁移和调试

相关工具推荐:

📚 相关文章