在微服务架构中,事件驱动(Event-Driven)模式已经成为服务间通信的主流选择。Redis 5.0 引入的 Streams 数据结构,提供了一种介于 Redis Pub/Sub 和 Apache Kafka 之间的轻量级消息方案——持久化、支持消费者组、具备消息确认机制,却只需要一个你可能已经在用的 Redis 实例。根据 Redis 官方基准测试,单节点 Streams 的吞吐量可达 每秒 100 万条消息,而端到端延迟在亚毫秒级别。
很多开发者在需要消息队列时,第一反应是引入 Kafka 或 RabbitMQ,但对于中小规模场景(日消息量 1000 万以下),Redis Streams 往往是更务实的选择——运维成本低、学习曲线平、与现有 Redis 基础设施无缝集成。
📨 一、Redis Streams 核心机制详解
Redis Streams 本质上是一个仅追加(Append-Only)的日志数据结构,每条消息都有一个自动生成的 ID(格式为 时间戳-序号),支持多消费者并行读取和消息确认。
🔑 1.1 消息写入与读取基础
最基本的写入操作是 XADD,读取操作是 XREAD:
// 基础:写入和读取 Redis Streams 消息
import Redis from 'ioredis';
const redis = new Redis({ host: 'localhost', port: 6379 });
// 写入消息(* 表示自动生成 ID)
const msgId = await redis.xadd(
'orders', // stream key
'*', // 自动生成 ID
'event', 'created', // field-value 对
'orderId', '10042',
'userId', 'u_2891',
'amount', '299.00'
);
console.log('Message ID:', msgId); // 例如 "1717142400000-0"
// 读取最新消息(从指定 ID 开始,0-0 表示从头读取)
const messages = await redis.xread(
'COUNT', 10, // 最多读 10 条
'STREAMS', 'orders', // stream key
'0-0' // 起始 ID(0-0 = 从头)
);
console.log(JSON.stringify(messages, null, 2));
💡 **提示:**消息 ID 的格式是
{毫秒时间戳}-{序号},例如1717142400000-0。在同一毫秒内写入多条消息时,序号会自动递增。你也可以指定自定义 ID,但通常让 Redis 自动生成即可。
👥 1.2 消费者组:并行消费的核心
单消费者模式无法并行处理,而消费者组(Consumer Group)允许多个消费者协作消费同一个 Stream,每条消息只会被组内的一个消费者处理:
// 创建消费者组并进行消费
import Redis from 'ioredis';
const redis = new Redis();
// 1. 创建消费者组(MKSTREAM = 如果 stream 不存在则自动创建)
await redis.xgroup('CREATE', 'orders', 'order-processors', '0', 'MKSTREAM');
// 2. 消费者 A 读取消息
const msgsA = await redis.xreadgroup(
'GROUP', 'order-processors', 'consumer-a', // 组名 + 消费者名
'COUNT', 5, // 每次最多读 5 条
'BLOCK', 2000, // 阻塞等待 2 秒
'NOACK', // 不需要显式 ACK(不推荐生产使用)
'STREAMS', 'orders', '>'
);
console.log('Consumer A got:', msgsA);
// 3. 消费者 B 同时读取(同组内不会收到 A 已读取的消息)
const msgsB = await redis.xreadgroup(
'GROUP', 'order-processors', 'consumer-b',
'COUNT', 5,
'BLOCK', 2000,
'STREAMS', 'orders', '>'
);
console.log('Consumer B got:', msgsB);
// 4. 处理完成后确认消息(重要!)
// 假设 msgsA 返回了消息 ID "1717142400000-0"
await redis.xack('orders', 'order-processors', '1717142400000-0');
⚠️ 警告:永远不要在生产环境中使用
NOACK。没有 ACK 的消息在消费者崩溃后不会被重新投递,你会静默丢失消息。务必在业务逻辑处理完成后显式调用XACK。
消费者组的关键行为:
- ✅ 负载均衡:同组内的消费者各自接收不同的消息,实现天然的负载分散
- ✅ 消息持久化:消息写入 Redis 后持久保存,消费者重启后可以继续消费
- ✅ 消息确认:
XACK确认后消息才标记为「已处理」,未确认的消息可以被重投 - ❌ 不支持广播:同组内每条消息只被一个消费者收到,如需广播需创建多个组
🔄 1.3 Pending 消息与故障恢复
消费者组维护了一个 Pending Entries List(PEL),记录已读取但未确认的消息。当消费者崩溃时,这些消息可以被其他消费者接管:
// 故障恢复:接管挂掉消费者的消息
import Redis from 'ioredis';
const redis = new Redis();
// 1. 查看 stream 的 pending 消息概览
const pendingSummary = await redis.xpending('orders', 'order-processors');
console.log('Pending summary:', pendingSummary);
// 返回:[pending总数, 最小ID, 最大ID, [消费者名, 待确认数]]
// 2. 查看某个消费者的具体 pending 消息
const pendingDetail = await redis.xpending(
'orders', 'order-processors',
'IDLE', 60000, // 空闲超过 60 秒的消息
'-', '+', // ID 范围(全部)
10 // 最多返回 10 条
);
console.log('Idle pending messages:', pendingDetail);
// 3. 接管(Claim)挂掉消费者的消息
const claimed = await redis.xclaim(
'orders', 'order-processors',
'consumer-b', // 新消费者名
60000, // 空闲阈值:60 秒
'1717142400000-0' // 要接管的消息 ID
);
console.log('Claimed messages:', claimed);
// 4. 处理并确认接管的消息
for (const [id, fields] of claimed) {
// ... 重新处理业务逻辑
await redis.xack('orders', 'order-processors', id);
}
📌 记住:
XCLAIM只接管消息的所有权,不会重新投递消息内容。如果需要获取消息的原始数据并重新处理,使用JUSTID选项配合XRANGE查看。更推荐使用 Redis 6.2+ 的XAUTOCLAIM,它会自动扫描并接管超时消息。
🏗️ 二、实战:构建订单事件处理系统
下面我们用 Redis Streams 构建一个完整的订单事件处理系统,包含事件发布、多消费者并行处理、失败重试和死信队列。
📋 2.1 架构设计
┌─────────┐ XADD ┌──────────────┐ XREADGROUP ┌──────────────┐
│ 订单服务 │ ────────→ │ orders stream │ ──────────────→ │ 库存消费者 │
└─────────┘ │ │ ├──────────────┤
│ │ XREADGROUP │ 支付消费者 │
│ │ ──────────────→ ├──────────────┤
│ │ │ 通知消费者 │
└──────────────┘ └──────────────┘
│ │
XCLAIM (恢复) XACK (确认)
│ │
↓ ↓
┌──────────────┐ ┌──────────────┐
│ 死信队列 │ │ 处理完成 │
│ orders:dead │ │ XDEL 清理 │
└──────────────┘ └──────────────┘
⚙️ 2.2 完整实现代码
// 订单事件处理系统:生产者 + 消费者 + 死信队列
import Redis from 'ioredis';
const redis = new Redis({ maxRetriesPerRequest: 3 });
// ==================== 事件发布者 ====================
class OrderEventProducer {
constructor(streamKey = 'orders') {
this.streamKey = streamKey;
}
async publish(event, data) {
const fields = ['event', event, 'data', JSON.stringify(data), 'timestamp', Date.now().toString()];
const id = await redis.xadd(this.streamKey, '*', ...fields);
console.log(`[Producer] Published ${event}, ID: ${id}`);
return id;
}
}
// ==================== 事件消费者基类 ====================
class StreamConsumer {
constructor(streamKey, groupName, consumerName, options = {}) {
this.streamKey = streamKey;
this.groupName = groupName;
this.consumerName = consumerName;
this.maxRetries = options.maxRetries || 3;
this.batchSize = options.batchSize || 10;
this.blockMs = options.blockMs || 5000;
this.running = false;
}
async init() {
try {
await redis.xgroup('CREATE', this.streamKey, this.groupName, '0', 'MKSTREAM');
} catch (e) {
if (!e.message.includes('BUSYGROUP')) throw e; // 组已存在则忽略
}
}
async start() {
this.running = true;
console.log(`[Consumer:${this.consumerName}] Started`);
while (this.running) {
try {
// 读取新消息
const results = await redis.xreadgroup(
'GROUP', this.groupName, this.consumerName,
'COUNT', this.batchSize,
'BLOCK', this.blockMs,
'STREAMS', this.streamKey, '>'
);
if (!results) continue;
for (const [, messages] of results) {
for (const [id, fields] of messages) {
await this.processMessage(id, fields);
}
}
// 定期处理超时的 pending 消息
await this.claimStaleMessages();
} catch (err) {
if (this.running) {
console.error(`[Consumer:${this.consumerName}] Error:`, err.message);
await new Promise(r => setTimeout(r, 1000)); // 错误后短暂等待
}
}
}
}
async processMessage(id, fields) {
const data = this.parseFields(fields);
try {
await this.handle(data);
await redis.xack(this.streamKey, this.groupName, id);
console.log(`[Consumer:${this.consumerName}] Processed ${id}`);
} catch (err) {
console.error(`[Consumer:${this.consumerName}] Failed ${id}:`, err.message);
await this.handleFailure(id, data, err);
}
}
async handleFailure(id, data, error) {
// 检查重试次数
const retries = await redis.hget(`retry:${id}`, 'count') || '0';
const count = parseInt(retries) + 1;
if (count >= this.maxRetries) {
// 超过最大重试次数,移入死信队列
await redis.xadd(`${this.streamKey}:dead`, '*',
'originalId', id,
'data', JSON.stringify(data),
'error', error.message,
'retries', count.toString()
);
await redis.xack(this.streamKey, this.groupName, id);
await redis.del(`retry:${id}`);
console.warn(`[DeadLetter] ${id} moved to dead letter queue after ${count} retries`);
} else {
await redis.hset(`retry:${id}`, 'count', count.toString(), 'lastError', error.message);
console.log(`[Retry] ${id} will be retried (attempt ${count}/${this.maxRetries})`);
}
}
async claimStaleMessages() {
try {
const claimed = await redis.xautoclaim(
this.streamKey, this.groupName, this.consumerName,
30000, // 空闲 30 秒的消息
'0-0',
'COUNT', 5
);
// claimed[0] 是下一个 start ID,claimed[1] 是接管的消息
if (claimed[1] && claimed[1].length > 0) {
console.log(`[Consumer:${this.consumerName}] Claimed ${claimed[1].length} stale messages`);
for (const [id, fields] of claimed[1]) {
await this.processMessage(id, fields);
}
}
} catch (e) {
// XAUTOCLAIM 需要 Redis 6.2+
}
}
parseFields(fields) {
const obj = {};
for (let i = 0; i < fields.length; i += 2) {
obj[fields[i]] = fields[i + 1];
}
if (obj.data) obj.data = JSON.parse(obj.data);
return obj;
}
stop() {
this.running = false;
}
// 子类实现
async handle(data) { throw new Error('Not implemented'); }
}
// ==================== 具体消费者实现 ====================
class InventoryConsumer extends StreamConsumer {
constructor() {
super('orders', 'order-processors', 'inventory-worker');
}
async handle({ event, data }) {
if (event !== 'order.created') return;
console.log(`[Inventory] Reserving stock for order ${data.orderId}`);
// 模拟库存扣减
await new Promise(r => setTimeout(r, 50));
}
}
class PaymentConsumer extends StreamConsumer {
constructor() {
super('orders', 'order-processors', 'payment-worker');
}
async handle({ event, data }) {
if (event !== 'order.created') return;
console.log(`[Payment] Processing payment for order ${data.orderId}: ¥${data.amount}`);
// 模拟支付处理
await new Promise(r => setTimeout(r, 100));
}
}
// ==================== 启动示例 ====================
const producer = new OrderEventProducer();
const inventory = new InventoryConsumer();
const payment = new PaymentConsumer();
await inventory.init();
await payment.init();
// 并行启动消费者
Promise.all([inventory.start(), payment.start()]);
// 模拟订单创建
for (let i = 0; i < 20; i++) {
await producer.publish('order.created', {
orderId: `ORD-${1000 + i}`,
userId: `U-${Math.floor(Math.random() * 1000)}`,
amount: (Math.random() * 500 + 50).toFixed(2),
});
}
💡 提示:
XREADGROUP的>参数表示「只读取从未被任何消费者处理过的新消息」。如果要读取当前消费者自己未确认的消息(重试场景),使用0-0替代>。
📊 三、方案对比与选型指南
在选择消息队列方案时,需要根据实际场景权衡。以下是 Redis Streams 与主流方案的对比:
| 特性 | Redis Streams | Apache Kafka | RabbitMQ | BullMQ |
|---|---|---|---|---|
| 消息持久化 | ✅ 内存 + RDB/AOF | ✅ 磁盘 | ✅ 磁盘 | ✅ Redis |
| 消费者组 | ✅ 原生支持 | ✅ 原生支持 | ⚠️ 需插件 | ✅ 原生支持 |
| 消息回溯 | ✅ 按 ID 回溯 | ✅ 按 offset | ❌ 消费即删 | ❌ |
| 延迟 | ~0.1ms | ~2ms | ~1ms | ~0.5ms |
| 单节点吞吐 | ~100万 msg/s | ~10万 msg/s | ~5万 msg/s | ~50万 msg/s |
| 运维复杂度 | ⭐ 极低 | ⭐⭐⭐⭐ 高 | ⭐⭐⭐ 中 | ⭐ 极低 |
| 消息积压处理 | ✅ 内存可控 | ✅ 磁盘 | ⚠️ 内存压力 | ✅ Redis |
| 适合场景 | 中小规模实时 | 大规模日志流 | 复杂路由 | 任务队列 |
| 推荐指数 | ✅ 中小项目首选 | ✅ 大数据场景 | ⚠️ 复杂场景 | ✅ Node.js 项目 |
🎯 选型决策树
根据你的场景选择合适的方案:
- ✅ 选 Redis Streams:日消息量 < 1000 万,已有 Redis 基础设施,追求低运维成本
- ✅ 选 Kafka:日消息量 > 1 亿,需要长期消息保留(天/周级别),大数据生态集成
- ✅ 选 RabbitMQ:需要复杂路由规则(topic/fanout/direct),多协议支持
- ✅ 选 BullMQ:纯 Node.js 项目,需要任务调度(延迟任务、定时任务、优先级队列)
- ❌ 不要用 Redis Streams:消息不能丢失(金融级)、需要跨数据中心复制
⚠️ **警告:**Redis Streams 的消息存储在内存中,受
maxmemory限制。如果消息积压超过内存上限,根据淘汰策略(volatile-lru等)可能会丢失消息。务必配置XTRIM或MAXLEN来控制 stream 长度。
⚠️ 四、生产环境的坑点与最佳实践
🕳️ 4.1 常见坑点
在生产中使用 Redis Streams,以下是最常踩的坑:
坑点 1:忘记 ACK 导致内存泄漏
未确认的消息会一直保留在 PEL 中,长期积累会导致内存膨胀。需要定期监控 pending 数量并处理超时消息。
// 监控消费者组的健康状态
async function monitorConsumerGroup(streamKey, groupName) {
const pending = await redis.xpending(streamKey, groupName);
const [total, minId, maxId, consumers] = pending;
console.log(`Pending messages: ${total}`);
if (total > 1000) {
console.warn(`⚠️ 积压消息过多: ${total},需要检查消费者状态`);
}
// 检查各消费者的 pending 数量
for (const [consumerName, count] of consumers) {
console.log(` ${consumerName}: ${count} pending`);
if (count > 500) {
console.warn(` ⚠️ ${consumerName} 积压严重,可能已崩溃`);
}
}
}
坑点 2:Stream 无限增长撑爆内存
XADD 默认不会自动清理历史消息,必须使用 MAXLEN 或 MINID 参数:
// ❌ 错误写法:不控制 stream 大小
await redis.xadd('orders', '*', 'event', 'created', 'data', '...');
// ✅ 正确写法:使用 MAXLEN 限制最大长度(近似裁剪,性能更好)
await redis.xadd('orders', 'MAXLEN', '~', '100000', '*', 'event', 'created', 'data', '...');
// ✅ 更好:使用 MINID 按时间清理(保留最近 24 小时)
const oneDayAgo = Date.now() - 86400000;
await redis.xadd('orders', 'MINID', '~', `${oneDayAgo}-0`, '*', 'event', 'created');
// ✅ 定期修剪:适合大批量清理
await redis.xtrim('orders', 'MAXLEN', '~', '50000');
💡 提示:
~符号表示「近似裁剪」,Redis 会在内部以 100 条消息为单位批量删除,性能远优于精确裁剪。对于大多数场景,近似裁剪的结果已经足够好。
坑点 3:消费者名冲突导致消息「消失」
如果两个进程使用相同的消费者名加入同一个组,消息会在它们之间随机分配,导致看似「丢失」。确保每个消费者实例有唯一的名称(推荐使用 hostname + pid + uuid)。
✅ 4.2 生产环境最佳实践
- ✅ 为每个业务域创建独立的 Stream,不要把所有事件塞进一个 key
- ✅ 使用 MAXLEN/MINID 控制 Stream 大小,建议保留最近 24-72 小时的消息
- ✅ 实现死信队列,超过重试次数的消息不要丢弃,移到
stream:dead便于排查 - ✅ 监控 pending 消息数量,设置告警阈值(如单消费者 pending > 500)
- ✅ **使用 XAUTOCLAIM(Redis 6.2+)**自动接管超时消费者的消息
- ✅ 在业务逻辑幂等后才 ACK,确保消息不会因处理中途失败而丢失
- ❌ 不要用 NOACK,除非你能接受消息丢失
- ❌ 不要在一个 Stream 中混合不同类型的事件,会导致消费者过滤效率低下
- ❌ 不要依赖 Redis Streams 做跨数据中心复制,它只支持单节点或主从复制
📐 4.3 容量规划参考
根据实际场景估算 Redis Streams 的内存占用:
| 消息大小 | 每日消息量 | 保留时长 | 预估内存占用 |
|---|---|---|---|
| 500 bytes | 100 万 | 24 小时 | ~500 MB |
| 1 KB | 100 万 | 24 小时 | ~1 GB |
| 1 KB | 1000 万 | 24 小时 | ~10 GB |
| 2 KB | 1000 万 | 72 小时 | ~60 GB |
⚠️ **警告:**以上为粗略估算,实际内存占用受 Redis 内部编码、字段名复用率等因素影响。建议在预生产环境用实际数据做基准测试。如果消息量超过单机内存容量,考虑分片(多个 stream key)或降级到 Kafka。
🔧 五、与 Node.js 生态的集成
Redis Streams 与 Node.js 生态有很好的集成,以下是常用库的对比:
| 库 | 特点 | 推荐场景 |
|---|---|---|
ioredis |
底层 API,完全控制 | 需要精细调优的生产场景 |
bullmq |
高层封装,内置任务调度 | 纯任务队列场景 |
better-queue |
轻量级,简单易用 | 小项目快速原型 |
对于大多数 Node.js 项目,直接使用 ioredis 操作 Streams API 是最灵活的方式。如果你的需求是任务队列(延迟执行、优先级、定时任务),bullmq 是更好的选择,它在 Redis Streams 之上封装了更高级的功能。
📝 总结
Redis Streams 是一个被严重低估的消息队列方案。它不像 Kafka 那样「大而全」,但在中小规模场景下,它的低运维成本、亚毫秒延迟、与现有 Redis 基础设施的无缝集成,使它成为非常务实的选择。
⚡ **关键结论:**如果你的日消息量在 1000 万以下,已有 Redis 实例,且不需要跨数据中心复制,Redis Streams 几乎总是比引入 Kafka 更好的选择。它让你在不增加运维复杂度的前提下,获得可靠的消息投递能力。
相关工具推荐:
- 🔧 Redis 官方文档 — Streams — 完整的 Streams API 参考
- 🔧 ioredis — Node.js 最流行的 Redis 客户端
- 🔧 BullMQ — 基于 Redis Streams 的高级任务队列
- 🔧 Redis Insight — 可视化查看 Streams 数据