2024 年 Stripe 的工程博客披露了一组数据:他们的消息系统每天处理超过 10 亿条消息,峰值吞吐量达到每秒 100 万条。消息队列(Message Queue)不是什么新概念,但它依然是分布式系统中最关键的基础设施之一。大多数开发者会直接使用 RabbitMQ 或 Kafka,却很少深入理解其内部机制。这篇文章将带你用 JavaScript 从零实现一个具备持久化、消费者组和确认机制的消息队列,帮助你在生产环境中做出更明智的架构决策。
🏗️ 一、消息队列的核心架构与设计决策
1.1 为什么需要消息队列?
在同步调用的世界里,服务 A 调用服务 B,必须等待 B 返回才能继续。这在简单场景下没问题,但在高并发、高可用的系统中会带来三个致命问题:
- 耦合性:服务 A 必须知道服务 B 的地址,B 挂了 A 也受影响
- 削峰能力差:流量突增时,下游服务直接被打垮
- 扩展性差:想加一个消费者处理能力,得改代码
消息队列的核心价值就是解耦、削峰、异步。但这些抽象的好处背后,是一个精密的存储引擎和调度系统。
1.2 核心架构模型
一个最小可行的消息队列包含以下组件:
Producer → [Broker (Queue/Topic)] → Consumer
| 组件 | 职责 | 关键设计点 |
|---|---|---|
| Producer(生产者) | 发送消息到 Broker | 异步发送、批量发送、重试策略 |
| Broker(代理) | 存储和转发消息 | 持久化策略、索引结构、内存管理 |
| Consumer(消费者) | 从 Broker 拉取并处理消息 | 拉取 vs 推送、确认机制、重平衡 |
| Topic/Queue(主题/队列) | 消息的逻辑分区 | 分区策略、顺序保证、TTL |
📌 **记住:**队列(Queue)和主题(Topic)是两个不同的模型。队列是点对点(一条消息只被一个消费者处理),主题是发布/订阅(一条消息可以被多个消费者组分别消费)。现代消息队列通常基于 Topic 模型。
1.3 存储引擎的选择:追加日志 vs 链表
消息队列的存储引擎决定了它的性能上限。主流方案有两种:
| 方案 | 原理 | 优势 | 劣势 |
|---|---|---|---|
| 追加日志(Append-Only Log) | 消息顺序追加到文件,用偏移量索引 | 写入极快(顺序 I/O)、吞吐量高 | 删除旧消息需要压缩(Compaction) |
| 链表/树结构 | 消息用链表或 B+ 树组织 | 随机访问灵活、精确删除 | 随机 I/O 慢、吞吐量低 |
Kafka 和 RocketMQ 都选择了追加日志方案,因为消息队列的核心场景是顺序写入、顺序读取,这完美匹配了磁盘的物理特性。我们的实现也采用这个方案。
⚠️ **警告:**不要在内存中用普通数组或 Map 存储消息!看似简单,但无法持久化,重启数据全丢,内存也会无限增长。
🔧 二、从零实现:代码实战
2.1 基础 Broker 实现
我们先实现一个最小的 Broker,支持基本的消息存储和消费:
// broker.js — 消息队列核心引擎
const EventEmitter = require('events');
const crypto = require('crypto');
class Message {
constructor(topic, payload) {
this.id = crypto.randomUUID();
this.topic = topic;
this.payload = payload;
this.timestamp = Date.now();
this.offset = 0;
this.deliveryCount = 0;
}
}
class TopicPartition {
constructor(topic, partitionId = 0) {
this.topic = topic;
this.partitionId = partitionId;
this.messages = []; // 内存缓冲
this.maxSize = 100000; // 最大消息数
this.committedOffsets = new Map(); // consumerGroup -> offset
}
// 生产消息:追加到分区末尾
append(message) {
if (this.messages.length >= this.maxSize) {
throw new Error(`Topic ${this.topic} partition ${this.partitionId} is full`);
}
message.offset = this.messages.length;
this.messages.push(message);
return message.offset;
}
// 消费消息:从指定偏移量拉取
fetch(consumerGroup, batchSize = 10) {
const committed = this.committedOffsets.get(consumerGroup) || 0;
const start = committed;
const end = Math.min(start + batchSize, this.messages.length);
if (start >= end) return [];
const batch = this.messages.slice(start, end);
batch.forEach(msg => msg.deliveryCount++);
return batch;
}
// 确认消费:提交偏移量
commit(consumerGroup, offset) {
const current = this.committedOffsets.get(consumerGroup) || 0;
if (offset > current) {
this.committedOffsets.set(consumerGroup, offset);
}
}
// 获取待消费消息数量
pendingCount(consumerGroup) {
const committed = this.committedOffsets.get(consumerGroup) || 0;
return this.messages.length - committed;
}
}
class Broker extends EventEmitter {
constructor(options = {}) {
super();
this.topics = new Map(); // topic -> TopicPartition[]
this.numPartitions = options.numPartitions || 4;
this.maxRetries = options.maxRetries || 3;
}
// 创建主题
createTopic(topic) {
if (this.topics.has(topic)) return;
const partitions = [];
for (let i = 0; i < this.numPartitions; i++) {
partitions.push(new TopicPartition(topic, i));
}
this.topics.set(topic, partitions);
this.emit('topic:created', topic);
}
// 选择分区(轮询策略)
selectPartition(topic) {
const partitions = this.topics.get(topic);
if (!partitions) throw new Error(`Topic ${topic} not found`);
return partitions[Math.floor(Math.random() * partitions.length)];
}
// 生产消息
produce(topic, payload) {
const partition = this.selectPartition(topic);
const message = new Message(topic, payload);
const offset = partition.append(message);
this.emit('message:produced', { topic, partition: partition.partitionId, offset });
return { messageId: message.id, partition: partition.partitionId, offset };
}
// 消费消息
consume(topic, consumerGroup, batchSize = 10) {
const partitions = this.topics.get(topic);
if (!partitions) throw new Error(`Topic ${topic} not found`);
const allMessages = [];
for (const partition of partitions) {
const messages = partition.fetch(consumerGroup, batchSize);
allMessages.push(...messages);
}
// 按时间排序,保证全局有序(仅在单分区内有意义)
allMessages.sort((a, b) => a.timestamp - b.timestamp);
return allMessages.slice(0, batchSize);
}
// 确认消息
ack(topic, consumerGroup, partitionId, offset) {
const partitions = this.topics.get(topic);
if (!partitions) throw new Error(`Topic ${topic} not found`);
partitions[partitionId].commit(consumerGroup, offset + 1);
this.emit('message:acked', { topic, partition: partitionId, offset });
}
// 获取主题状态
getTopicStats(topic) {
const partitions = this.topics.get(topic);
if (!partitions) return null;
return {
topic,
partitions: partitions.map(p => ({
id: p.partitionId,
messageCount: p.messages.length,
committedOffsets: Object.fromEntries(p.committedOffsets),
})),
totalMessages: partitions.reduce((sum, p) => sum + p.messages.length, 0),
};
}
}
module.exports = { Broker, Message, TopicPartition };
💡 提示:
TopicPartition的fetch方法不会移除消息,只是读取。消息只在压缩(Compaction)时才被物理删除。这是 Kafka 的核心设计哲学——消费者自己维护偏移量,Broker 只负责存储。
2.2 持久化引擎:写入磁盘
内存存储在重启后数据全丢。我们需要一个持久化层。核心思路是用预写日志(Write-Ahead Log, WAL)——消息先写入磁盘日志文件,再返回成功:
// persist.js — 基于文件的持久化引擎
const fs = require('fs');
const path = require('path');
class PersistEngine {
constructor(baseDir, options = {}) {
this.baseDir = baseDir;
this.maxFileSize = options.maxFileSize || 64 * 1024 * 1024; // 64MB
this.syncWrites = options.syncWrites || false;
this.fileHandles = new Map();
// 确保目录存在
fs.mkdirSync(baseDir, { recursive: true });
}
// 获取日志文件路径
getLogPath(topic, partitionId) {
const dir = path.join(this.baseDir, topic, String(partitionId));
fs.mkdirSync(dir, { recursive: true });
// 根据当前文件编号生成路径
const fileNum = this._getCurrentFileNum(dir);
return path.join(dir, `${String(fileNum).padStart(8, '0')}.log`);
}
_getCurrentFileNum(dir) {
const files = fs.readdirSync(dir).filter(f => f.endsWith('.log')).sort();
return files.length > 0 ? parseInt(files[files.length - 1], 10) : 0;
}
// 追加写入消息(二进制格式)
write(topic, partitionId, message) {
const filePath = this.getLogPath(topic, partitionId);
const json = JSON.stringify(message);
const buffer = Buffer.from(json, 'utf-8');
// 写入格式:[4字节长度][JSON数据]
const lenBuf = Buffer.alloc(4);
lenBuf.writeUInt32BE(buffer.length, 0);
const record = Buffer.concat([lenBuf, buffer]);
// 检查文件大小,必要时切换新文件
try {
const stat = fs.statSync(filePath);
if (stat.size + record.length > this.maxFileSize) {
const dir = path.dirname(filePath);
const fileNum = this._getCurrentFileNum(dir) + 1;
const newPath = path.join(dir, `${String(fileNum).padStart(8, '0')}.log`);
fs.appendFileSync(newPath, record);
return;
}
} catch (e) {
// 文件不存在,直接写入
}
fs.appendFileSync(filePath, record, { flag: this.syncWrites ? 'rs' : 'a' });
}
// 读取消息
read(topic, partitionId, startOffset = 0, count = 100) {
const dir = path.join(this.baseDir, topic, String(partitionId));
if (!fs.existsSync(dir)) return [];
const files = fs.readdirSync(dir).filter(f => f.endsWith('.log')).sort();
const messages = [];
let currentOffset = 0;
for (const file of files) {
const filePath = path.join(dir, file);
const data = fs.readFileSync(filePath);
let pos = 0;
while (pos < data.length) {
if (pos + 4 > data.length) break;
const len = data.readUInt32BE(pos);
pos += 4;
if (pos + len > data.length) break;
const json = data.slice(pos, pos + len).toString('utf-8');
pos += len;
if (currentOffset >= startOffset) {
messages.push(JSON.parse(json));
if (messages.length >= count) return messages;
}
currentOffset++;
}
}
return messages;
}
// 获取消息总数
getMessageCount(topic, partitionId) {
const dir = path.join(this.baseDir, topic, String(partitionId));
if (!fs.existsSync(dir)) return 0;
const files = fs.readdirSync(dir).filter(f => f.endsWith('.log'));
let count = 0;
for (const file of files) {
const data = fs.readFileSync(path.join(dir, file));
let pos = 0;
while (pos < data.length) {
if (pos + 4 > data.length) break;
const len = data.readUInt32BE(pos);
pos += 4 + len;
count++;
}
}
return count;
}
}
module.exports = { PersistEngine };
⚠️ **警告:**这里的
appendFileSync是同步操作,在高吞吐场景下会阻塞事件循环。生产环境中应该使用fs.open获取文件描述符,配合writev批量写入或使用pipeline流式处理。
2.3 消费者组与死信队列
消费者组(Consumer Group)是消息队列的核心抽象。同一个组内的消费者分摊消息,不同组独立消费。当消息重试多次仍然失败时,需要移入死信队列(Dead Letter Queue, DLQ):
// consumer-group.js — 消费者组与死信队列
const EventEmitter = require('events');
class ConsumerGroup extends EventEmitter {
constructor(broker, groupId, options = {}) {
super();
this.broker = broker;
this.groupId = groupId;
this.maxRetries = options.maxRetries || 3;
this.retryDelay = options.retryDelay || 1000; // 重试间隔 ms
this.pollInterval = options.pollInterval || 100; // 拉取间隔 ms
this.topics = new Map(); // topic -> { handler, running }
this.deadLetters = []; // 死信队列
this.processed = 0;
this.failed = 0;
}
// 订阅主题并注册处理函数
subscribe(topic, handler) {
this.broker.createTopic(topic); // 确保主题存在
// 创建死信主题
this.broker.createTopic(`${topic}.DLQ`);
this.topics.set(topic, { handler, running: false });
}
// 启动消费
async start() {
for (const [topic, config] of this.topics) {
if (config.running) continue;
config.running = true;
this._pollLoop(topic, config);
console.log(`[ConsumerGroup:${this.groupId}] 开始消费 ${topic}`);
}
}
// 停止消费
stop() {
for (const [topic, config] of this.topics) {
config.running = false;
}
console.log(`[ConsumerGroup:${this.groupId}] 已停止`);
}
// 拉取循环
async _pollLoop(topic, config) {
while (config.running) {
try {
const messages = this.broker.consume(topic, this.groupId, 10);
for (const message of messages) {
await this._processWithRetry(topic, message, config.handler);
// 确认消息
this.broker.ack(topic, this.groupId, message.partition, message.offset);
}
} catch (err) {
console.error(`[ConsumerGroup:${this.groupId}] 消费错误:`, err.message);
}
// 等待下次拉取
await new Promise(r => setTimeout(r, this.pollInterval));
}
}
// 带重试的消息处理
async _processWithRetry(topic, message, handler) {
let lastError;
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
await handler(message.payload, {
messageId: message.id,
attempt,
topic: message.topic,
timestamp: message.timestamp,
});
this.processed++;
return;
} catch (err) {
lastError = err;
console.warn(
`[${this.groupId}] 消息 ${message.id} 第 ${attempt} 次重试失败: ${err.message}`
);
if (attempt < this.maxRetries) {
// 指数退避
await new Promise(r => setTimeout(r, this.retryDelay * Math.pow(2, attempt - 1)));
}
}
}
// 重试耗尽,移入死信队列
this.failed++;
const dlqMessage = {
...message,
originalTopic: topic,
errorMessage: lastError.message,
failedAt: Date.now(),
retryCount: this.maxRetries,
};
this.broker.produce(`${topic}.DLQ`, dlqMessage);
this.deadLetters.push(dlqMessage);
this.emit('message:dead', dlqMessage);
console.error(`[${this.groupId}] 消息 ${message.id} 已移入 DLQ`);
}
// 获取统计信息
getStats() {
return {
groupId: this.groupId,
processed: this.processed,
failed: this.failed,
deadLetters: this.deadLetters.length,
subscribedTopics: [...this.topics.keys()],
};
}
}
module.exports = { ConsumerGroup };
使用示例——模拟一个电商订单处理系统:
// example.js — 电商订单处理场景
const { Broker } = require('./broker');
const { ConsumerGroup } = require('./consumer-group');
const broker = new Broker({ numPartitions: 4 });
// 创建订单消费者组
const orderGroup = new ConsumerGroup(broker, 'order-processor', {
maxRetries: 3,
retryDelay: 500,
});
// 模拟不稳定的外部支付 API
const paymentAPI = async (order) => {
if (Math.random() < 0.3) throw new Error('Payment gateway timeout');
return { status: 'success', transactionId: `TXN-${Date.now()}` };
};
// 订阅订单主题
orderGroup.subscribe('orders', async (payload, meta) => {
console.log(`处理订单 ${payload.orderId} (第 ${meta.attempt} 次)`);
// 1. 调用支付接口
const payment = await paymentAPI(payload);
console.log(` ✅ 支付成功: ${payment.transactionId}`);
// 2. 更新库存(模拟)
console.log(` ✅ 库存已扣减: ${payload.items.length} 件商品`);
// 3. 发送通知(模拟)
console.log(` ✅ 通知已发送: ${payload.customerEmail}`);
});
// 监听死信事件
orderGroup.on('message:dead', (msg) => {
console.log(` ❌ 订单 ${msg.payload.orderId} 进入死信队列`);
});
// 启动消费
orderGroup.start();
// 生产测试消息
for (let i = 1; i <= 10; i++) {
broker.produce('orders', {
orderId: `ORD-${String(i).padStart(5, '0')}`,
customerEmail: `user${i}@example.com`,
items: [{ sku: 'SKU-A', qty: i }],
total: i * 99.9,
});
}
// 5 秒后查看统计
setTimeout(() => {
console.log('\n📊 消费统计:', JSON.stringify(orderGroup.getStats(), null, 2));
orderGroup.stop();
process.exit(0);
}, 5000);
📊 三、性能对比与生产级思考
3.1 方案对比
在选择消息队列方案时,你需要理解每种方案的核心差异:
| 特性 | 本实现(教学版) | RabbitMQ | Kafka | Redis Streams |
|---|---|---|---|---|
| 吞吐量(单机) | ~5,000 msg/s | ~30,000 msg/s | ~100万 msg/s | ~50,000 msg/s |
| 持久化 | 文件追加 | Erlang Mnesia | 追加日志 + 分段 | RDB/AOF |
| 消费者组 | ✅ 手动实现 | ✅ 原生支持 | ✅ 原生支持 | ✅ 原生支持 |
| 消息顺序 | 单分区内有序 | 队列内有序 | 分区内有序 | Stream 内有序 |
| 延迟(P99) | ~5ms | ~1ms | ~5ms | ~0.5ms |
| 适用场景 | 学习与原型 | 复杂路由、RPC | 大数据流、日志 | 轻量级队列 |
| 运维复杂度 | 低 | 中 | 高 | 低 |
⚡ **关键结论:**如果你的消息量在每秒 1000 条以下、不需要复杂的路由规则,Redis Streams 是最佳选择——运维简单、性能够用、原生支持消费者组。超过每秒 10,000 条再考虑 Kafka。
3.2 生产环境避坑指南
在实际项目中,消息队列踩坑最多的地方不是技术实现,而是设计决策:
坑点 1:消息丢失三连击
消息丢失可能发生在三个环节:
- ❌ 生产者发送失败就丢了 → ✅ 使用发送确认(Publisher Confirm),失败后重试
- ❌ Broker 宕机内存消息丢失 → ✅ 开启持久化,
syncWrites: true - ❌ 消费者处理到一半挂了 → ✅ 手动确认(Manual ACK),处理完再提交偏移量
// ❌ 错误写法:自动确认,消息可能丢失
const messages = broker.consume('orders', 'group1');
messages.forEach(msg => processMessage(msg)); // 处理失败就丢了
// ✅ 正确写法:手动确认,逐条处理
const messages = broker.consume('orders', 'group1');
for (const msg of messages) {
try {
await processMessage(msg);
broker.ack('orders', 'group1', msg.partition, msg.offset); // 处理成功才确认
} catch (err) {
console.error('处理失败,等待重试:', err.message);
break; // 不确认,下次会重新消费
}
}
坑点 2:消息重复消费
网络抖动、消费者超时重启都会导致同一条消息被处理多次。解决方案是幂等性设计——让同一个操作执行多次和执行一次的效果相同:
// 幂等性设计:用数据库唯一索引去重
const processedMessages = new Set();
async function processMessageIdempotent(message) {
if (processedMessages.has(message.id)) {
console.log(`消息 ${message.id} 已处理,跳过`);
return;
}
await saveOrder(message.payload);
processedMessages.add(message.id);
}
坑点 3:消费者重平衡风暴
当一个消费者加入或离开组时,会触发重平衡(Rebalance)。在 Kafka 中,重平衡期间所有消费者停止消费。如果消费者处理时间过长,频繁超时触发重平衡,就会陷入「重平衡风暴」。
✅ 解决方案:缩短单次处理时间、增加会话超时时间、使用增量重平衡协议(Kafka 4.0+)。
3.3 如何选择消息队列?
根据你的实际场景做决策:
- ✅ 原型验证、轻量级任务:Redis Streams 或本教学实现
- ✅ 微服务间通信、RPC 调用:RabbitMQ(AMQP 协议成熟,路由灵活)
- ✅ 大数据流处理、日志收集:Kafka(分区 + 副本 = 高吞吐 + 高可用)
- ✅ 云原生、Serverless:AWS SQS / Google Pub/Sub(免运维)
- ❌ 不要用 Redis List 做消息队列(
LPUSH/BRPOP没有消费者组、没有确认机制) - ❌ 不要用数据库表轮询做队列(性能差、锁竞争严重)
💡 提示:如果你的团队规模小、消息量不大,选择运维最简单的方案。消息队列本身不是瓶颈,运维复杂度才是。一个需要 3 人专职运维的 Kafka 集群,对小团队来说是负资产。
💡 总结
从零实现消息队列的过程,揭示了几个关键认知:
- 追加日志是一切的基础。Kafka 的高性能不是魔法,而是顺序 I/O + 零拷贝 + 批量处理的组合拳
- 消费者组解决了水平扩展问题,但也引入了重平衡的复杂度
- 死信队列不是可选功能,而是生产环境的必需品——没有 DLQ,你永远不知道哪些消息处理失败了
- 选择消息队列的标准不是吞吐量,而是运维成本和团队熟悉度
作为开发者,理解消息队列的内部原理不是为了造轮子,而是在出现生产事故时能快速定位问题。当你知道消息在 Broker 中是以追加日志存储的,你就能理解为什么「消费慢」不会导致 Broker 阻塞;当你知道消费者组的重平衡机制,你就能理解为什么「消费停顿了几秒钟」。
⚡ **关键结论:**消息队列是分布式系统的基石。先理解原理,再选择工具,最后在生产中持续优化。这个顺序不能反。
相关工具推荐:
- 🔧 jsjson.com JSON 工具 — 调试消息 Payload 时格式化 JSON
- 🔧 Redpanda — Kafka 的 C++ 替代品,运维更简单
- 🔧 BullMQ — Node.js 生态最成熟的 Redis 任务队列
- 🔧 NATS JetStream — 轻量级但功能完整的消息系统