Redis Streams 实战指南:构建生产级事件驱动系统

深入讲解 Redis Streams 的核心机制、消费者组模式和生产实战技巧,对比 Kafka/RabbitMQ,手把手构建可靠的事件驱动微服务架构。

数据库 2026-05-30 15 分钟

在微服务架构中,事件驱动(Event-Driven)模式已经成为服务间通信的主流选择。Redis 5.0 引入的 Streams 数据结构,提供了一种介于 Redis Pub/Sub 和 Apache Kafka 之间的轻量级消息方案——持久化、支持消费者组、具备消息确认机制,却只需要一个你可能已经在用的 Redis 实例。根据 Redis 官方基准测试,单节点 Streams 的吞吐量可达 每秒 100 万条消息,而端到端延迟在亚毫秒级别。

很多开发者在需要消息队列时,第一反应是引入 Kafka 或 RabbitMQ,但对于中小规模场景(日消息量 1000 万以下),Redis Streams 往往是更务实的选择——运维成本低、学习曲线平、与现有 Redis 基础设施无缝集成。

📨 一、Redis Streams 核心机制详解

Redis Streams 本质上是一个仅追加(Append-Only)的日志数据结构,每条消息都有一个自动生成的 ID(格式为 时间戳-序号),支持多消费者并行读取和消息确认。

🔑 1.1 消息写入与读取基础

最基本的写入操作是 XADD,读取操作是 XREAD

// 基础:写入和读取 Redis Streams 消息
import Redis from 'ioredis';

const redis = new Redis({ host: 'localhost', port: 6379 });

// 写入消息(* 表示自动生成 ID)
const msgId = await redis.xadd(
  'orders',           // stream key
  '*',                // 自动生成 ID
  'event', 'created', // field-value 对
  'orderId', '10042',
  'userId', 'u_2891',
  'amount', '299.00'
);
console.log('Message ID:', msgId); // 例如 "1717142400000-0"

// 读取最新消息(从指定 ID 开始,0-0 表示从头读取)
const messages = await redis.xread(
  'COUNT', 10,        // 最多读 10 条
  'STREAMS', 'orders', // stream key
  '0-0'               // 起始 ID(0-0 = 从头)
);
console.log(JSON.stringify(messages, null, 2));

💡 **提示:**消息 ID 的格式是 {毫秒时间戳}-{序号},例如 1717142400000-0。在同一毫秒内写入多条消息时,序号会自动递增。你也可以指定自定义 ID,但通常让 Redis 自动生成即可。

👥 1.2 消费者组:并行消费的核心

单消费者模式无法并行处理,而消费者组(Consumer Group)允许多个消费者协作消费同一个 Stream,每条消息只会被组内的一个消费者处理:

// 创建消费者组并进行消费
import Redis from 'ioredis';

const redis = new Redis();

// 1. 创建消费者组(MKSTREAM = 如果 stream 不存在则自动创建)
await redis.xgroup('CREATE', 'orders', 'order-processors', '0', 'MKSTREAM');

// 2. 消费者 A 读取消息
const msgsA = await redis.xreadgroup(
  'GROUP', 'order-processors', 'consumer-a',  // 组名 + 消费者名
  'COUNT', 5,                                  // 每次最多读 5 条
  'BLOCK', 2000,                               // 阻塞等待 2 秒
  'NOACK',                                     // 不需要显式 ACK(不推荐生产使用)
  'STREAMS', 'orders', '>'
);
console.log('Consumer A got:', msgsA);

// 3. 消费者 B 同时读取(同组内不会收到 A 已读取的消息)
const msgsB = await redis.xreadgroup(
  'GROUP', 'order-processors', 'consumer-b',
  'COUNT', 5,
  'BLOCK', 2000,
  'STREAMS', 'orders', '>'
);
console.log('Consumer B got:', msgsB);

// 4. 处理完成后确认消息(重要!)
// 假设 msgsA 返回了消息 ID "1717142400000-0"
await redis.xack('orders', 'order-processors', '1717142400000-0');

⚠️ 警告:永远不要在生产环境中使用 NOACK。没有 ACK 的消息在消费者崩溃后不会被重新投递,你会静默丢失消息。务必在业务逻辑处理完成后显式调用 XACK

消费者组的关键行为:

  • 负载均衡:同组内的消费者各自接收不同的消息,实现天然的负载分散
  • 消息持久化:消息写入 Redis 后持久保存,消费者重启后可以继续消费
  • 消息确认XACK 确认后消息才标记为「已处理」,未确认的消息可以被重投
  • 不支持广播:同组内每条消息只被一个消费者收到,如需广播需创建多个组

🔄 1.3 Pending 消息与故障恢复

消费者组维护了一个 Pending Entries List(PEL),记录已读取但未确认的消息。当消费者崩溃时,这些消息可以被其他消费者接管:

// 故障恢复:接管挂掉消费者的消息
import Redis from 'ioredis';

const redis = new Redis();

// 1. 查看 stream 的 pending 消息概览
const pendingSummary = await redis.xpending('orders', 'order-processors');
console.log('Pending summary:', pendingSummary);
// 返回:[pending总数, 最小ID, 最大ID, [消费者名, 待确认数]]

// 2. 查看某个消费者的具体 pending 消息
const pendingDetail = await redis.xpending(
  'orders', 'order-processors',
  'IDLE', 60000,    // 空闲超过 60 秒的消息
  '-', '+',         // ID 范围(全部)
  10                // 最多返回 10 条
);
console.log('Idle pending messages:', pendingDetail);

// 3. 接管(Claim)挂掉消费者的消息
const claimed = await redis.xclaim(
  'orders', 'order-processors',
  'consumer-b',     // 新消费者名
  60000,            // 空闲阈值:60 秒
  '1717142400000-0' // 要接管的消息 ID
);
console.log('Claimed messages:', claimed);

// 4. 处理并确认接管的消息
for (const [id, fields] of claimed) {
  // ... 重新处理业务逻辑
  await redis.xack('orders', 'order-processors', id);
}

📌 记住:XCLAIM 只接管消息的所有权,不会重新投递消息内容。如果需要获取消息的原始数据并重新处理,使用 JUSTID 选项配合 XRANGE 查看。更推荐使用 Redis 6.2+ 的 XAUTOCLAIM,它会自动扫描并接管超时消息。

🏗️ 二、实战:构建订单事件处理系统

下面我们用 Redis Streams 构建一个完整的订单事件处理系统,包含事件发布、多消费者并行处理、失败重试和死信队列。

📋 2.1 架构设计

┌─────────┐    XADD     ┌──────────────┐    XREADGROUP    ┌──────────────┐
│  订单服务  │ ────────→  │ orders stream │ ──────────────→  │ 库存消费者    │
└─────────┘             │              │                  ├──────────────┤
                        │              │    XREADGROUP    │ 支付消费者    │
                        │              │ ──────────────→  ├──────────────┤
                        │              │                  │ 通知消费者    │
                        └──────────────┘                  └──────────────┘
                               │                                  │
                          XCLAIM (恢复)                     XACK (确认)
                               │                                  │
                               ↓                                  ↓
                        ┌──────────────┐                  ┌──────────────┐
                        │  死信队列     │                  │  处理完成      │
                        │ orders:dead  │                  │  XDEL 清理    │
                        └──────────────┘                  └──────────────┘

⚙️ 2.2 完整实现代码

// 订单事件处理系统:生产者 + 消费者 + 死信队列
import Redis from 'ioredis';

const redis = new Redis({ maxRetriesPerRequest: 3 });

// ==================== 事件发布者 ====================
class OrderEventProducer {
  constructor(streamKey = 'orders') {
    this.streamKey = streamKey;
  }

  async publish(event, data) {
    const fields = ['event', event, 'data', JSON.stringify(data), 'timestamp', Date.now().toString()];
    const id = await redis.xadd(this.streamKey, '*', ...fields);
    console.log(`[Producer] Published ${event}, ID: ${id}`);
    return id;
  }
}

// ==================== 事件消费者基类 ====================
class StreamConsumer {
  constructor(streamKey, groupName, consumerName, options = {}) {
    this.streamKey = streamKey;
    this.groupName = groupName;
    this.consumerName = consumerName;
    this.maxRetries = options.maxRetries || 3;
    this.batchSize = options.batchSize || 10;
    this.blockMs = options.blockMs || 5000;
    this.running = false;
  }

  async init() {
    try {
      await redis.xgroup('CREATE', this.streamKey, this.groupName, '0', 'MKSTREAM');
    } catch (e) {
      if (!e.message.includes('BUSYGROUP')) throw e; // 组已存在则忽略
    }
  }

  async start() {
    this.running = true;
    console.log(`[Consumer:${this.consumerName}] Started`);

    while (this.running) {
      try {
        // 读取新消息
        const results = await redis.xreadgroup(
          'GROUP', this.groupName, this.consumerName,
          'COUNT', this.batchSize,
          'BLOCK', this.blockMs,
          'STREAMS', this.streamKey, '>'
        );

        if (!results) continue;

        for (const [, messages] of results) {
          for (const [id, fields] of messages) {
            await this.processMessage(id, fields);
          }
        }

        // 定期处理超时的 pending 消息
        await this.claimStaleMessages();
      } catch (err) {
        if (this.running) {
          console.error(`[Consumer:${this.consumerName}] Error:`, err.message);
          await new Promise(r => setTimeout(r, 1000)); // 错误后短暂等待
        }
      }
    }
  }

  async processMessage(id, fields) {
    const data = this.parseFields(fields);
    try {
      await this.handle(data);
      await redis.xack(this.streamKey, this.groupName, id);
      console.log(`[Consumer:${this.consumerName}] Processed ${id}`);
    } catch (err) {
      console.error(`[Consumer:${this.consumerName}] Failed ${id}:`, err.message);
      await this.handleFailure(id, data, err);
    }
  }

  async handleFailure(id, data, error) {
    // 检查重试次数
    const retries = await redis.hget(`retry:${id}`, 'count') || '0';
    const count = parseInt(retries) + 1;

    if (count >= this.maxRetries) {
      // 超过最大重试次数,移入死信队列
      await redis.xadd(`${this.streamKey}:dead`, '*',
        'originalId', id,
        'data', JSON.stringify(data),
        'error', error.message,
        'retries', count.toString()
      );
      await redis.xack(this.streamKey, this.groupName, id);
      await redis.del(`retry:${id}`);
      console.warn(`[DeadLetter] ${id} moved to dead letter queue after ${count} retries`);
    } else {
      await redis.hset(`retry:${id}`, 'count', count.toString(), 'lastError', error.message);
      console.log(`[Retry] ${id} will be retried (attempt ${count}/${this.maxRetries})`);
    }
  }

  async claimStaleMessages() {
    try {
      const claimed = await redis.xautoclaim(
        this.streamKey, this.groupName, this.consumerName,
        30000,  // 空闲 30 秒的消息
        '0-0',
        'COUNT', 5
      );
      // claimed[0] 是下一个 start ID,claimed[1] 是接管的消息
      if (claimed[1] && claimed[1].length > 0) {
        console.log(`[Consumer:${this.consumerName}] Claimed ${claimed[1].length} stale messages`);
        for (const [id, fields] of claimed[1]) {
          await this.processMessage(id, fields);
        }
      }
    } catch (e) {
      // XAUTOCLAIM 需要 Redis 6.2+
    }
  }

  parseFields(fields) {
    const obj = {};
    for (let i = 0; i < fields.length; i += 2) {
      obj[fields[i]] = fields[i + 1];
    }
    if (obj.data) obj.data = JSON.parse(obj.data);
    return obj;
  }

  stop() {
    this.running = false;
  }

  // 子类实现
  async handle(data) { throw new Error('Not implemented'); }
}

// ==================== 具体消费者实现 ====================
class InventoryConsumer extends StreamConsumer {
  constructor() {
    super('orders', 'order-processors', 'inventory-worker');
  }
  async handle({ event, data }) {
    if (event !== 'order.created') return;
    console.log(`[Inventory] Reserving stock for order ${data.orderId}`);
    // 模拟库存扣减
    await new Promise(r => setTimeout(r, 50));
  }
}

class PaymentConsumer extends StreamConsumer {
  constructor() {
    super('orders', 'order-processors', 'payment-worker');
  }
  async handle({ event, data }) {
    if (event !== 'order.created') return;
    console.log(`[Payment] Processing payment for order ${data.orderId}: ¥${data.amount}`);
    // 模拟支付处理
    await new Promise(r => setTimeout(r, 100));
  }
}

// ==================== 启动示例 ====================
const producer = new OrderEventProducer();
const inventory = new InventoryConsumer();
const payment = new PaymentConsumer();

await inventory.init();
await payment.init();

// 并行启动消费者
Promise.all([inventory.start(), payment.start()]);

// 模拟订单创建
for (let i = 0; i < 20; i++) {
  await producer.publish('order.created', {
    orderId: `ORD-${1000 + i}`,
    userId: `U-${Math.floor(Math.random() * 1000)}`,
    amount: (Math.random() * 500 + 50).toFixed(2),
  });
}

💡 提示:XREADGROUP> 参数表示「只读取从未被任何消费者处理过的新消息」。如果要读取当前消费者自己未确认的消息(重试场景),使用 0-0 替代 >

📊 三、方案对比与选型指南

在选择消息队列方案时,需要根据实际场景权衡。以下是 Redis Streams 与主流方案的对比:

特性 Redis Streams Apache Kafka RabbitMQ BullMQ
消息持久化 ✅ 内存 + RDB/AOF ✅ 磁盘 ✅ 磁盘 ✅ Redis
消费者组 ✅ 原生支持 ✅ 原生支持 ⚠️ 需插件 ✅ 原生支持
消息回溯 ✅ 按 ID 回溯 ✅ 按 offset ❌ 消费即删
延迟 ~0.1ms ~2ms ~1ms ~0.5ms
单节点吞吐 ~100万 msg/s ~10万 msg/s ~5万 msg/s ~50万 msg/s
运维复杂度 ⭐ 极低 ⭐⭐⭐⭐ 高 ⭐⭐⭐ 中 ⭐ 极低
消息积压处理 ✅ 内存可控 ✅ 磁盘 ⚠️ 内存压力 ✅ Redis
适合场景 中小规模实时 大规模日志流 复杂路由 任务队列
推荐指数 ✅ 中小项目首选 ✅ 大数据场景 ⚠️ 复杂场景 ✅ Node.js 项目

🎯 选型决策树

根据你的场景选择合适的方案:

  • 选 Redis Streams:日消息量 < 1000 万,已有 Redis 基础设施,追求低运维成本
  • 选 Kafka:日消息量 > 1 亿,需要长期消息保留(天/周级别),大数据生态集成
  • 选 RabbitMQ:需要复杂路由规则(topic/fanout/direct),多协议支持
  • 选 BullMQ:纯 Node.js 项目,需要任务调度(延迟任务、定时任务、优先级队列)
  • 不要用 Redis Streams:消息不能丢失(金融级)、需要跨数据中心复制

⚠️ **警告:**Redis Streams 的消息存储在内存中,受 maxmemory 限制。如果消息积压超过内存上限,根据淘汰策略(volatile-lru 等)可能会丢失消息。务必配置 XTRIMMAXLEN 来控制 stream 长度。

⚠️ 四、生产环境的坑点与最佳实践

🕳️ 4.1 常见坑点

在生产中使用 Redis Streams,以下是最常踩的坑:

坑点 1:忘记 ACK 导致内存泄漏

未确认的消息会一直保留在 PEL 中,长期积累会导致内存膨胀。需要定期监控 pending 数量并处理超时消息。

// 监控消费者组的健康状态
async function monitorConsumerGroup(streamKey, groupName) {
  const pending = await redis.xpending(streamKey, groupName);
  const [total, minId, maxId, consumers] = pending;

  console.log(`Pending messages: ${total}`);
  if (total > 1000) {
    console.warn(`⚠️ 积压消息过多: ${total},需要检查消费者状态`);
  }

  // 检查各消费者的 pending 数量
  for (const [consumerName, count] of consumers) {
    console.log(`  ${consumerName}: ${count} pending`);
    if (count > 500) {
      console.warn(`  ⚠️ ${consumerName} 积压严重,可能已崩溃`);
    }
  }
}

坑点 2:Stream 无限增长撑爆内存

XADD 默认不会自动清理历史消息,必须使用 MAXLENMINID 参数:

// ❌ 错误写法:不控制 stream 大小
await redis.xadd('orders', '*', 'event', 'created', 'data', '...');

// ✅ 正确写法:使用 MAXLEN 限制最大长度(近似裁剪,性能更好)
await redis.xadd('orders', 'MAXLEN', '~', '100000', '*', 'event', 'created', 'data', '...');

// ✅ 更好:使用 MINID 按时间清理(保留最近 24 小时)
const oneDayAgo = Date.now() - 86400000;
await redis.xadd('orders', 'MINID', '~', `${oneDayAgo}-0`, '*', 'event', 'created');

// ✅ 定期修剪:适合大批量清理
await redis.xtrim('orders', 'MAXLEN', '~', '50000');

💡 提示:~ 符号表示「近似裁剪」,Redis 会在内部以 100 条消息为单位批量删除,性能远优于精确裁剪。对于大多数场景,近似裁剪的结果已经足够好。

坑点 3:消费者名冲突导致消息「消失」

如果两个进程使用相同的消费者名加入同一个组,消息会在它们之间随机分配,导致看似「丢失」。确保每个消费者实例有唯一的名称(推荐使用 hostname + pid + uuid)。

✅ 4.2 生产环境最佳实践

  • 为每个业务域创建独立的 Stream,不要把所有事件塞进一个 key
  • 使用 MAXLEN/MINID 控制 Stream 大小,建议保留最近 24-72 小时的消息
  • 实现死信队列,超过重试次数的消息不要丢弃,移到 stream:dead 便于排查
  • 监控 pending 消息数量,设置告警阈值(如单消费者 pending > 500)
  • ✅ **使用 XAUTOCLAIM(Redis 6.2+)**自动接管超时消费者的消息
  • 在业务逻辑幂等后才 ACK,确保消息不会因处理中途失败而丢失
  • 不要用 NOACK,除非你能接受消息丢失
  • 不要在一个 Stream 中混合不同类型的事件,会导致消费者过滤效率低下
  • 不要依赖 Redis Streams 做跨数据中心复制,它只支持单节点或主从复制

📐 4.3 容量规划参考

根据实际场景估算 Redis Streams 的内存占用:

消息大小 每日消息量 保留时长 预估内存占用
500 bytes 100 万 24 小时 ~500 MB
1 KB 100 万 24 小时 ~1 GB
1 KB 1000 万 24 小时 ~10 GB
2 KB 1000 万 72 小时 ~60 GB

⚠️ **警告:**以上为粗略估算,实际内存占用受 Redis 内部编码、字段名复用率等因素影响。建议在预生产环境用实际数据做基准测试。如果消息量超过单机内存容量,考虑分片(多个 stream key)或降级到 Kafka。

🔧 五、与 Node.js 生态的集成

Redis Streams 与 Node.js 生态有很好的集成,以下是常用库的对比:

特点 推荐场景
ioredis 底层 API,完全控制 需要精细调优的生产场景
bullmq 高层封装,内置任务调度 纯任务队列场景
better-queue 轻量级,简单易用 小项目快速原型

对于大多数 Node.js 项目,直接使用 ioredis 操作 Streams API 是最灵活的方式。如果你的需求是任务队列(延迟执行、优先级、定时任务),bullmq 是更好的选择,它在 Redis Streams 之上封装了更高级的功能。

📝 总结

Redis Streams 是一个被严重低估的消息队列方案。它不像 Kafka 那样「大而全」,但在中小规模场景下,它的低运维成本、亚毫秒延迟、与现有 Redis 基础设施的无缝集成,使它成为非常务实的选择。

⚡ **关键结论:**如果你的日消息量在 1000 万以下,已有 Redis 实例,且不需要跨数据中心复制,Redis Streams 几乎总是比引入 Kafka 更好的选择。它让你在不增加运维复杂度的前提下,获得可靠的消息投递能力。

相关工具推荐:

📚 相关文章