从零实现消息队列:原理、代码与生产级思考

用 JavaScript 从零构建一个完整的消息队列系统,深入理解生产者、消费者、持久化、消费者组等核心机制,附完整可运行代码与性能对比分析。

数据结构与算法 2026-06-09 15 分钟

2024 年 Stripe 的工程博客披露了一组数据:他们的消息系统每天处理超过 10 亿条消息,峰值吞吐量达到每秒 100 万条。消息队列(Message Queue)不是什么新概念,但它依然是分布式系统中最关键的基础设施之一。大多数开发者会直接使用 RabbitMQ 或 Kafka,却很少深入理解其内部机制。这篇文章将带你用 JavaScript 从零实现一个具备持久化、消费者组和确认机制的消息队列,帮助你在生产环境中做出更明智的架构决策。

🏗️ 一、消息队列的核心架构与设计决策

1.1 为什么需要消息队列?

在同步调用的世界里,服务 A 调用服务 B,必须等待 B 返回才能继续。这在简单场景下没问题,但在高并发、高可用的系统中会带来三个致命问题:

  • 耦合性:服务 A 必须知道服务 B 的地址,B 挂了 A 也受影响
  • 削峰能力差:流量突增时,下游服务直接被打垮
  • 扩展性差:想加一个消费者处理能力,得改代码

消息队列的核心价值就是解耦、削峰、异步。但这些抽象的好处背后,是一个精密的存储引擎和调度系统。

1.2 核心架构模型

一个最小可行的消息队列包含以下组件:

Producer → [Broker (Queue/Topic)] → Consumer
组件 职责 关键设计点
Producer(生产者) 发送消息到 Broker 异步发送、批量发送、重试策略
Broker(代理) 存储和转发消息 持久化策略、索引结构、内存管理
Consumer(消费者) 从 Broker 拉取并处理消息 拉取 vs 推送、确认机制、重平衡
Topic/Queue(主题/队列) 消息的逻辑分区 分区策略、顺序保证、TTL

📌 **记住:**队列(Queue)和主题(Topic)是两个不同的模型。队列是点对点(一条消息只被一个消费者处理),主题是发布/订阅(一条消息可以被多个消费者组分别消费)。现代消息队列通常基于 Topic 模型。

1.3 存储引擎的选择:追加日志 vs 链表

消息队列的存储引擎决定了它的性能上限。主流方案有两种:

方案 原理 优势 劣势
追加日志(Append-Only Log) 消息顺序追加到文件,用偏移量索引 写入极快(顺序 I/O)、吞吐量高 删除旧消息需要压缩(Compaction)
链表/树结构 消息用链表或 B+ 树组织 随机访问灵活、精确删除 随机 I/O 慢、吞吐量低

Kafka 和 RocketMQ 都选择了追加日志方案,因为消息队列的核心场景是顺序写入、顺序读取,这完美匹配了磁盘的物理特性。我们的实现也采用这个方案。

⚠️ **警告:**不要在内存中用普通数组或 Map 存储消息!看似简单,但无法持久化,重启数据全丢,内存也会无限增长。

🔧 二、从零实现:代码实战

2.1 基础 Broker 实现

我们先实现一个最小的 Broker,支持基本的消息存储和消费:

// broker.js — 消息队列核心引擎
const EventEmitter = require('events');
const crypto = require('crypto');

class Message {
  constructor(topic, payload) {
    this.id = crypto.randomUUID();
    this.topic = topic;
    this.payload = payload;
    this.timestamp = Date.now();
    this.offset = 0;
    this.deliveryCount = 0;
  }
}

class TopicPartition {
  constructor(topic, partitionId = 0) {
    this.topic = topic;
    this.partitionId = partitionId;
    this.messages = [];        // 内存缓冲
    this.maxSize = 100000;     // 最大消息数
    this.committedOffsets = new Map(); // consumerGroup -> offset
  }

  // 生产消息:追加到分区末尾
  append(message) {
    if (this.messages.length >= this.maxSize) {
      throw new Error(`Topic ${this.topic} partition ${this.partitionId} is full`);
    }
    message.offset = this.messages.length;
    this.messages.push(message);
    return message.offset;
  }

  // 消费消息:从指定偏移量拉取
  fetch(consumerGroup, batchSize = 10) {
    const committed = this.committedOffsets.get(consumerGroup) || 0;
    const start = committed;
    const end = Math.min(start + batchSize, this.messages.length);
    if (start >= end) return [];

    const batch = this.messages.slice(start, end);
    batch.forEach(msg => msg.deliveryCount++);
    return batch;
  }

  // 确认消费:提交偏移量
  commit(consumerGroup, offset) {
    const current = this.committedOffsets.get(consumerGroup) || 0;
    if (offset > current) {
      this.committedOffsets.set(consumerGroup, offset);
    }
  }

  // 获取待消费消息数量
  pendingCount(consumerGroup) {
    const committed = this.committedOffsets.get(consumerGroup) || 0;
    return this.messages.length - committed;
  }
}

class Broker extends EventEmitter {
  constructor(options = {}) {
    super();
    this.topics = new Map();           // topic -> TopicPartition[]
    this.numPartitions = options.numPartitions || 4;
    this.maxRetries = options.maxRetries || 3;
  }

  // 创建主题
  createTopic(topic) {
    if (this.topics.has(topic)) return;
    const partitions = [];
    for (let i = 0; i < this.numPartitions; i++) {
      partitions.push(new TopicPartition(topic, i));
    }
    this.topics.set(topic, partitions);
    this.emit('topic:created', topic);
  }

  // 选择分区(轮询策略)
  selectPartition(topic) {
    const partitions = this.topics.get(topic);
    if (!partitions) throw new Error(`Topic ${topic} not found`);
    return partitions[Math.floor(Math.random() * partitions.length)];
  }

  // 生产消息
  produce(topic, payload) {
    const partition = this.selectPartition(topic);
    const message = new Message(topic, payload);
    const offset = partition.append(message);
    this.emit('message:produced', { topic, partition: partition.partitionId, offset });
    return { messageId: message.id, partition: partition.partitionId, offset };
  }

  // 消费消息
  consume(topic, consumerGroup, batchSize = 10) {
    const partitions = this.topics.get(topic);
    if (!partitions) throw new Error(`Topic ${topic} not found`);

    const allMessages = [];
    for (const partition of partitions) {
      const messages = partition.fetch(consumerGroup, batchSize);
      allMessages.push(...messages);
    }

    // 按时间排序,保证全局有序(仅在单分区内有意义)
    allMessages.sort((a, b) => a.timestamp - b.timestamp);
    return allMessages.slice(0, batchSize);
  }

  // 确认消息
  ack(topic, consumerGroup, partitionId, offset) {
    const partitions = this.topics.get(topic);
    if (!partitions) throw new Error(`Topic ${topic} not found`);
    partitions[partitionId].commit(consumerGroup, offset + 1);
    this.emit('message:acked', { topic, partition: partitionId, offset });
  }

  // 获取主题状态
  getTopicStats(topic) {
    const partitions = this.topics.get(topic);
    if (!partitions) return null;
    return {
      topic,
      partitions: partitions.map(p => ({
        id: p.partitionId,
        messageCount: p.messages.length,
        committedOffsets: Object.fromEntries(p.committedOffsets),
      })),
      totalMessages: partitions.reduce((sum, p) => sum + p.messages.length, 0),
    };
  }
}

module.exports = { Broker, Message, TopicPartition };

💡 提示:TopicPartitionfetch 方法不会移除消息,只是读取。消息只在压缩(Compaction)时才被物理删除。这是 Kafka 的核心设计哲学——消费者自己维护偏移量,Broker 只负责存储。

2.2 持久化引擎:写入磁盘

内存存储在重启后数据全丢。我们需要一个持久化层。核心思路是用预写日志(Write-Ahead Log, WAL)——消息先写入磁盘日志文件,再返回成功:

// persist.js — 基于文件的持久化引擎
const fs = require('fs');
const path = require('path');

class PersistEngine {
  constructor(baseDir, options = {}) {
    this.baseDir = baseDir;
    this.maxFileSize = options.maxFileSize || 64 * 1024 * 1024; // 64MB
    this.syncWrites = options.syncWrites || false;
    this.fileHandles = new Map();

    // 确保目录存在
    fs.mkdirSync(baseDir, { recursive: true });
  }

  // 获取日志文件路径
  getLogPath(topic, partitionId) {
    const dir = path.join(this.baseDir, topic, String(partitionId));
    fs.mkdirSync(dir, { recursive: true });
    // 根据当前文件编号生成路径
    const fileNum = this._getCurrentFileNum(dir);
    return path.join(dir, `${String(fileNum).padStart(8, '0')}.log`);
  }

  _getCurrentFileNum(dir) {
    const files = fs.readdirSync(dir).filter(f => f.endsWith('.log')).sort();
    return files.length > 0 ? parseInt(files[files.length - 1], 10) : 0;
  }

  // 追加写入消息(二进制格式)
  write(topic, partitionId, message) {
    const filePath = this.getLogPath(topic, partitionId);
    const json = JSON.stringify(message);
    const buffer = Buffer.from(json, 'utf-8');

    // 写入格式:[4字节长度][JSON数据]
    const lenBuf = Buffer.alloc(4);
    lenBuf.writeUInt32BE(buffer.length, 0);
    const record = Buffer.concat([lenBuf, buffer]);

    // 检查文件大小,必要时切换新文件
    try {
      const stat = fs.statSync(filePath);
      if (stat.size + record.length > this.maxFileSize) {
        const dir = path.dirname(filePath);
        const fileNum = this._getCurrentFileNum(dir) + 1;
        const newPath = path.join(dir, `${String(fileNum).padStart(8, '0')}.log`);
        fs.appendFileSync(newPath, record);
        return;
      }
    } catch (e) {
      // 文件不存在,直接写入
    }

    fs.appendFileSync(filePath, record, { flag: this.syncWrites ? 'rs' : 'a' });
  }

  // 读取消息
  read(topic, partitionId, startOffset = 0, count = 100) {
    const dir = path.join(this.baseDir, topic, String(partitionId));
    if (!fs.existsSync(dir)) return [];

    const files = fs.readdirSync(dir).filter(f => f.endsWith('.log')).sort();
    const messages = [];
    let currentOffset = 0;

    for (const file of files) {
      const filePath = path.join(dir, file);
      const data = fs.readFileSync(filePath);
      let pos = 0;

      while (pos < data.length) {
        if (pos + 4 > data.length) break;
        const len = data.readUInt32BE(pos);
        pos += 4;
        if (pos + len > data.length) break;

        const json = data.slice(pos, pos + len).toString('utf-8');
        pos += len;

        if (currentOffset >= startOffset) {
          messages.push(JSON.parse(json));
          if (messages.length >= count) return messages;
        }
        currentOffset++;
      }
    }

    return messages;
  }

  // 获取消息总数
  getMessageCount(topic, partitionId) {
    const dir = path.join(this.baseDir, topic, String(partitionId));
    if (!fs.existsSync(dir)) return 0;

    const files = fs.readdirSync(dir).filter(f => f.endsWith('.log'));
    let count = 0;

    for (const file of files) {
      const data = fs.readFileSync(path.join(dir, file));
      let pos = 0;
      while (pos < data.length) {
        if (pos + 4 > data.length) break;
        const len = data.readUInt32BE(pos);
        pos += 4 + len;
        count++;
      }
    }

    return count;
  }
}

module.exports = { PersistEngine };

⚠️ **警告:**这里的 appendFileSync 是同步操作,在高吞吐场景下会阻塞事件循环。生产环境中应该使用 fs.open 获取文件描述符,配合 writev 批量写入或使用 pipeline 流式处理。

2.3 消费者组与死信队列

消费者组(Consumer Group)是消息队列的核心抽象。同一个组内的消费者分摊消息,不同组独立消费。当消息重试多次仍然失败时,需要移入死信队列(Dead Letter Queue, DLQ):

// consumer-group.js — 消费者组与死信队列
const EventEmitter = require('events');

class ConsumerGroup extends EventEmitter {
  constructor(broker, groupId, options = {}) {
    super();
    this.broker = broker;
    this.groupId = groupId;
    this.maxRetries = options.maxRetries || 3;
    this.retryDelay = options.retryDelay || 1000;     // 重试间隔 ms
    this.pollInterval = options.pollInterval || 100;   // 拉取间隔 ms
    this.topics = new Map();    // topic -> { handler, running }
    this.deadLetters = [];      // 死信队列
    this.processed = 0;
    this.failed = 0;
  }

  // 订阅主题并注册处理函数
  subscribe(topic, handler) {
    this.broker.createTopic(topic); // 确保主题存在
    // 创建死信主题
    this.broker.createTopic(`${topic}.DLQ`);
    this.topics.set(topic, { handler, running: false });
  }

  // 启动消费
  async start() {
    for (const [topic, config] of this.topics) {
      if (config.running) continue;
      config.running = true;
      this._pollLoop(topic, config);
      console.log(`[ConsumerGroup:${this.groupId}] 开始消费 ${topic}`);
    }
  }

  // 停止消费
  stop() {
    for (const [topic, config] of this.topics) {
      config.running = false;
    }
    console.log(`[ConsumerGroup:${this.groupId}] 已停止`);
  }

  // 拉取循环
  async _pollLoop(topic, config) {
    while (config.running) {
      try {
        const messages = this.broker.consume(topic, this.groupId, 10);
        for (const message of messages) {
          await this._processWithRetry(topic, message, config.handler);
          // 确认消息
          this.broker.ack(topic, this.groupId, message.partition, message.offset);
        }
      } catch (err) {
        console.error(`[ConsumerGroup:${this.groupId}] 消费错误:`, err.message);
      }

      // 等待下次拉取
      await new Promise(r => setTimeout(r, this.pollInterval));
    }
  }

  // 带重试的消息处理
  async _processWithRetry(topic, message, handler) {
    let lastError;
    for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
      try {
        await handler(message.payload, {
          messageId: message.id,
          attempt,
          topic: message.topic,
          timestamp: message.timestamp,
        });
        this.processed++;
        return;
      } catch (err) {
        lastError = err;
        console.warn(
          `[${this.groupId}] 消息 ${message.id} 第 ${attempt} 次重试失败: ${err.message}`
        );
        if (attempt < this.maxRetries) {
          // 指数退避
          await new Promise(r => setTimeout(r, this.retryDelay * Math.pow(2, attempt - 1)));
        }
      }
    }

    // 重试耗尽,移入死信队列
    this.failed++;
    const dlqMessage = {
      ...message,
      originalTopic: topic,
      errorMessage: lastError.message,
      failedAt: Date.now(),
      retryCount: this.maxRetries,
    };
    this.broker.produce(`${topic}.DLQ`, dlqMessage);
    this.deadLetters.push(dlqMessage);
    this.emit('message:dead', dlqMessage);
    console.error(`[${this.groupId}] 消息 ${message.id} 已移入 DLQ`);
  }

  // 获取统计信息
  getStats() {
    return {
      groupId: this.groupId,
      processed: this.processed,
      failed: this.failed,
      deadLetters: this.deadLetters.length,
      subscribedTopics: [...this.topics.keys()],
    };
  }
}

module.exports = { ConsumerGroup };

使用示例——模拟一个电商订单处理系统:

// example.js — 电商订单处理场景
const { Broker } = require('./broker');
const { ConsumerGroup } = require('./consumer-group');

const broker = new Broker({ numPartitions: 4 });

// 创建订单消费者组
const orderGroup = new ConsumerGroup(broker, 'order-processor', {
  maxRetries: 3,
  retryDelay: 500,
});

// 模拟不稳定的外部支付 API
const paymentAPI = async (order) => {
  if (Math.random() < 0.3) throw new Error('Payment gateway timeout');
  return { status: 'success', transactionId: `TXN-${Date.now()}` };
};

// 订阅订单主题
orderGroup.subscribe('orders', async (payload, meta) => {
  console.log(`处理订单 ${payload.orderId} (第 ${meta.attempt} 次)`);

  // 1. 调用支付接口
  const payment = await paymentAPI(payload);
  console.log(`  ✅ 支付成功: ${payment.transactionId}`);

  // 2. 更新库存(模拟)
  console.log(`  ✅ 库存已扣减: ${payload.items.length} 件商品`);

  // 3. 发送通知(模拟)
  console.log(`  ✅ 通知已发送: ${payload.customerEmail}`);
});

// 监听死信事件
orderGroup.on('message:dead', (msg) => {
  console.log(`  ❌ 订单 ${msg.payload.orderId} 进入死信队列`);
});

// 启动消费
orderGroup.start();

// 生产测试消息
for (let i = 1; i <= 10; i++) {
  broker.produce('orders', {
    orderId: `ORD-${String(i).padStart(5, '0')}`,
    customerEmail: `user${i}@example.com`,
    items: [{ sku: 'SKU-A', qty: i }],
    total: i * 99.9,
  });
}

// 5 秒后查看统计
setTimeout(() => {
  console.log('\n📊 消费统计:', JSON.stringify(orderGroup.getStats(), null, 2));
  orderGroup.stop();
  process.exit(0);
}, 5000);

📊 三、性能对比与生产级思考

3.1 方案对比

在选择消息队列方案时,你需要理解每种方案的核心差异:

特性 本实现(教学版) RabbitMQ Kafka Redis Streams
吞吐量(单机) ~5,000 msg/s ~30,000 msg/s ~100万 msg/s ~50,000 msg/s
持久化 文件追加 Erlang Mnesia 追加日志 + 分段 RDB/AOF
消费者组 ✅ 手动实现 ✅ 原生支持 ✅ 原生支持 ✅ 原生支持
消息顺序 单分区内有序 队列内有序 分区内有序 Stream 内有序
延迟(P99) ~5ms ~1ms ~5ms ~0.5ms
适用场景 学习与原型 复杂路由、RPC 大数据流、日志 轻量级队列
运维复杂度

⚡ **关键结论:**如果你的消息量在每秒 1000 条以下、不需要复杂的路由规则,Redis Streams 是最佳选择——运维简单、性能够用、原生支持消费者组。超过每秒 10,000 条再考虑 Kafka。

3.2 生产环境避坑指南

在实际项目中,消息队列踩坑最多的地方不是技术实现,而是设计决策

坑点 1:消息丢失三连击

消息丢失可能发生在三个环节:

  • 生产者发送失败就丢了 → ✅ 使用发送确认(Publisher Confirm),失败后重试
  • Broker 宕机内存消息丢失 → ✅ 开启持久化,syncWrites: true
  • 消费者处理到一半挂了 → ✅ 手动确认(Manual ACK),处理完再提交偏移量
// ❌ 错误写法:自动确认,消息可能丢失
const messages = broker.consume('orders', 'group1');
messages.forEach(msg => processMessage(msg)); // 处理失败就丢了

// ✅ 正确写法:手动确认,逐条处理
const messages = broker.consume('orders', 'group1');
for (const msg of messages) {
  try {
    await processMessage(msg);
    broker.ack('orders', 'group1', msg.partition, msg.offset); // 处理成功才确认
  } catch (err) {
    console.error('处理失败,等待重试:', err.message);
    break; // 不确认,下次会重新消费
  }
}

坑点 2:消息重复消费

网络抖动、消费者超时重启都会导致同一条消息被处理多次。解决方案是幂等性设计——让同一个操作执行多次和执行一次的效果相同:

// 幂等性设计:用数据库唯一索引去重
const processedMessages = new Set();

async function processMessageIdempotent(message) {
  if (processedMessages.has(message.id)) {
    console.log(`消息 ${message.id} 已处理,跳过`);
    return;
  }

  await saveOrder(message.payload);
  processedMessages.add(message.id);
}

坑点 3:消费者重平衡风暴

当一个消费者加入或离开组时,会触发重平衡(Rebalance)。在 Kafka 中,重平衡期间所有消费者停止消费。如果消费者处理时间过长,频繁超时触发重平衡,就会陷入「重平衡风暴」。

✅ 解决方案:缩短单次处理时间、增加会话超时时间、使用增量重平衡协议(Kafka 4.0+)。

3.3 如何选择消息队列?

根据你的实际场景做决策:

  • 原型验证、轻量级任务:Redis Streams 或本教学实现
  • 微服务间通信、RPC 调用:RabbitMQ(AMQP 协议成熟,路由灵活)
  • 大数据流处理、日志收集:Kafka(分区 + 副本 = 高吞吐 + 高可用)
  • 云原生、Serverless:AWS SQS / Google Pub/Sub(免运维)
  • 不要用 Redis List 做消息队列(LPUSH/BRPOP 没有消费者组、没有确认机制)
  • 不要用数据库表轮询做队列(性能差、锁竞争严重)

💡 提示:如果你的团队规模小、消息量不大,选择运维最简单的方案。消息队列本身不是瓶颈,运维复杂度才是。一个需要 3 人专职运维的 Kafka 集群,对小团队来说是负资产。

💡 总结

从零实现消息队列的过程,揭示了几个关键认知:

  1. 追加日志是一切的基础。Kafka 的高性能不是魔法,而是顺序 I/O + 零拷贝 + 批量处理的组合拳
  2. 消费者组解决了水平扩展问题,但也引入了重平衡的复杂度
  3. 死信队列不是可选功能,而是生产环境的必需品——没有 DLQ,你永远不知道哪些消息处理失败了
  4. 选择消息队列的标准不是吞吐量,而是运维成本和团队熟悉度

作为开发者,理解消息队列的内部原理不是为了造轮子,而是在出现生产事故时能快速定位问题。当你知道消息在 Broker 中是以追加日志存储的,你就能理解为什么「消费慢」不会导致 Broker 阻塞;当你知道消费者组的重平衡机制,你就能理解为什么「消费停顿了几秒钟」。

⚡ **关键结论:**消息队列是分布式系统的基石。先理解原理,再选择工具,最后在生产中持续优化。这个顺序不能反。

相关工具推荐:

  • 🔧 jsjson.com JSON 工具 — 调试消息 Payload 时格式化 JSON
  • 🔧 Redpanda — Kafka 的 C++ 替代品,运维更简单
  • 🔧 BullMQ — Node.js 生态最成熟的 Redis 任务队列
  • 🔧 NATS JetStream — 轻量级但功能完整的消息系统

📚 相关文章