从零构建 Node.js 优先级任务队列:延迟、重试、死信队列全解析

手把手用 Node.js 从零实现生产级任务队列,涵盖优先级调度、延迟任务、指数退避重试、死信队列、并发控制与持久化,附完整可运行代码和 BullMQ/pg-boss 性能对比。

后端开发 2026-06-07 15 分钟

你每天使用的每一个 Web 应用背后,几乎都有一个任务队列在默默工作——发送邮件、生成报表、处理图片、同步数据。根据 BullMQ 的 npm 周下载量已突破 200 万次,任务队列是后端开发中使用最广泛却最少被深入理解的基础设施之一。大多数开发者只会 npm install bullmq 然后调 API,对队列的内部调度机制、重试策略、并发控制原理一无所知。

本文不教你使用任何队列库。我们将从一个空的 queue.js 文件开始,用纯 Node.js 从零构建一个具备优先级调度、延迟任务、指数退避重试、死信队列和并发控制的生产级任务队列。理解了这些原理,你才能在生产环境中做出正确的架构决策。

🏗️ 一、核心架构:优先级队列与生产者-消费者模型

1.1 为什么需要任务队列?

在 Web 应用中,有些操作不适合在请求-响应周期内同步完成:

场景 同步处理的问题 队列化的优势
发送注册邮件 SMTP 调用耗时 2-5 秒,阻塞用户响应 立即返回响应,后台异步发送
生成 PDF 报表 CPU 密集型,阻塞事件循环 Worker 进程处理,主进程不阻塞
图片压缩/转码 内存占用大,可能 OOM 限制并发数,按优先级处理
第三方 API 同步 外部服务不稳定,需要重试 自动重试 + 指数退避 + 死信队列
数据库批量写入 高并发下连接池耗尽 削峰填谷,平滑写入速率

📌 记住: 任务队列的本质是异步+持久化+可控并发。没有持久化,进程崩溃就丢任务;没有并发控制,就可能打爆下游服务。

1.2 最小可行队列:从数组到优先级堆

最简单的队列就是一个数组,push 入队,shift 出队。但这有两个致命问题:

  • Array.shift() 的时间复杂度是 O(n),万级任务时性能急剧下降
  • 不支持优先级——所有任务平权,紧急任务无法插队

真正的生产级队列需要一个**二叉最小堆(Binary Min-Heap)**作为底层数据结构,O(log n) 的插入和删除,天然支持优先级排序:

// 优先级最小堆实现
class MinHeap {
  constructor(compareFn) {
    this.heap = [];
    this.compare = compareFn || ((a, b) => a.priority - b.priority);
  }

  // 获取父节点索引
  _parent(i) { return Math.floor((i - 1) / 2); }
  _left(i) { return 2 * i + 1; }
  _right(i) { return 2 * i + 2; }

  // 交换两个节点
  _swap(i, j) {
    [this.heap[i], this.heap[j]] = [this.heap[j], this.heap[i]];
  }

  // 上浮:插入新元素后恢复堆性质
  _siftUp(i) {
    while (i > 0) {
      const parent = this._parent(i);
      if (this.compare(this.heap[i], this.heap[parent]) >= 0) break;
      this._swap(i, parent);
      i = parent;
    }
  }

  // 下沉:删除堆顶后恢复堆性质
  _siftDown(i) {
    const n = this.heap.length;
    while (true) {
      let smallest = i;
      const left = this._left(i);
      const right = this._right(i);
      if (left < n && this.compare(this.heap[left], this.heap[smallest]) < 0) {
        smallest = left;
      }
      if (right < n && this.compare(this.heap[right], this.heap[smallest]) < 0) {
        smallest = right;
      }
      if (smallest === i) break;
      this._swap(i, smallest);
      i = smallest;
    }
  }

  insert(item) {
    this.heap.push(item);
    this._siftUp(this.heap.length - 1);
  }

  extractMin() {
    if (this.heap.length === 0) return null;
    const min = this.heap[0];
    const last = this.heap.pop();
    if (this.heap.length > 0) {
      this.heap[0] = last;
      this._siftDown(0);
    }
    return min;
  }

  peek() { return this.heap[0] || null; }
  get size() { return this.heap.length; }
}

⚠️ 警告: 不要直接用 Array.sort() 模拟优先级队列!每次插入后排序的时间复杂度是 O(n log n),而堆操作是 O(log n)。在高吞吐场景下,这个差距是数量级的。

⚙️ 二、完整队列实现:延迟、重试与并发控制

2.1 任务数据结构设计

一个生产级任务需要包含足够的元数据:

const crypto = require('crypto');

// 任务状态机
const JobStatus = {
  WAITING: 'waiting',     // 等待执行
  DELAYED: 'delayed',     // 延迟中
  ACTIVE: 'active',       // 执行中
  COMPLETED: 'completed', // 已完成
  FAILED: 'failed',       // 失败(可重试)
  DEAD: 'dead',           // 死信(超过最大重试次数)
};

class Job {
  constructor(name, data, options = {}) {
    this.id = options.id || crypto.randomUUID();
    this.name = name;
    this.data = data;
    this.priority = options.priority ?? 1000; // 数值越小,优先级越高
    this.status = JobStatus.WAITING;
    this.attempts = 0;
    this.maxAttempts = options.maxAttempts ?? 3;
    this.delay = options.delay ?? 0;          // 延迟毫秒数
    this.createdAt = Date.now();
    this.processedAt = null;
    this.completedAt = null;
    this.failedReason = null;
    this.result = null;
    // 计算下次执行时间(用于延迟任务和重试)
    this.nextRunAt = this.delay > 0
      ? this.createdAt + this.delay
      : this.createdAt;
  }
}

2.2 队列核心实现

这是整个系统的核心——一个支持优先级、延迟、重试和并发控制的队列:

const EventEmitter = require('events');

class JobQueue extends EventEmitter {
  constructor(options = {}) {
    super();
    this.name = options.name || 'default';
    this.concurrency = options.concurrency || 5;
    this.activeJobs = 0;
    this.running = false;

    // 三个核心数据结构
    this.waitingQueue = new MinHeap((a, b) => {
      // 先按优先级排序,优先级相同按创建时间排序
      if (a.priority !== b.priority) return a.priority - b.priority;
      return a.createdAt - b.createdAt;
    });
    this.delayedQueue = new MinHeap((a, b) => a.nextRunAt - b.nextRunAt);
    this.jobs = new Map(); // 所有任务的索引

    // 处理函数注册表
    this.processors = new Map();

    // 统计信息
    this.stats = { completed: 0, failed: 0, dead: 0 };

    // 定时器:检查延迟任务是否到期
    this._delayTimer = setInterval(() => this._processDelayed(), 100);
  }

  // 注册处理函数(支持按任务名注册不同处理器)
  process(name, handler) {
    if (typeof name === 'function') {
      handler = name;
      name = '*'; // 通配符:处理所有任务
    }
    this.processors.set(name, handler);
  }

  // 添加任务
  add(name, data, options = {}) {
    const job = new Job(name, data, options);
    this.jobs.set(job.id, job);

    if (job.delay > 0) {
      job.status = JobStatus.DELAYED;
      this.delayedQueue.insert(job);
    } else {
      this.waitingQueue.insert(job);
    }

    this.emit('waiting', job);
    this._schedule();
    return job;
  }

  // 调度执行(核心调度循环)
  _schedule() {
    if (!this.running) return;

    while (this.activeJobs < this.concurrency && this.waitingQueue.size > 0) {
      const job = this.waitingQueue.extractMin();
      if (!job) break;
      this._execute(job);
    }
  }

  // 执行单个任务
  async _execute(job) {
    this.activeJobs++;
    job.status = JobStatus.ACTIVE;
    job.processedAt = Date.now();
    job.attempts++;

    const handler = this.processors.get(job.name) || this.processors.get('*');
    if (!handler) {
      this.activeJobs--;
      job.status = JobStatus.FAILED;
      job.failedReason = `No processor registered for job: ${job.name}`;
      this._handleFailure(job, new Error(job.failedReason));
      return;
    }

    try {
      // 设置超时控制
      const timeout = job.options?.timeout || 30000;
      const result = await Promise.race([
        handler(job.data, job),
        this._createTimeout(timeout),
      ]);

      // 成功
      job.status = JobStatus.COMPLETED;
      job.result = result;
      job.completedAt = Date.now();
      this.stats.completed++;
      this.activeJobs--;
      this.emit('completed', job, result);
      this._schedule();
    } catch (error) {
      this.activeJobs--;
      this._handleFailure(job, error);
    }
  }

  // 处理失败:指数退避重试
  _handleFailure(job, error) {
    job.failedReason = error.message;

    if (job.attempts < job.maxAttempts) {
      // 指数退避:baseDelay * 2^(attempts-1) + 随机抖动
      const baseDelay = 1000;
      const exponentialDelay = baseDelay * Math.pow(2, job.attempts - 1);
      const jitter = Math.random() * 1000; // 随机抖动防止惊群效应
      job.nextRunAt = Date.now() + exponentialDelay + jitter;
      job.status = JobStatus.DELAYED;

      this.delayedQueue.insert(job);
      this.emit('retrying', job, job.attempts, job.maxAttempts);
      this._schedule();
    } else {
      // 超过最大重试次数,进入死信队列
      job.status = JobStatus.DEAD;
      this.stats.dead++;
      this.emit('dead', job, error);
      this._schedule();
    }

    this.stats.failed++;
    this.emit('failed', job, error, job.attempts);
  }

  // 处理延迟队列中到期的任务
  _processDelayed() {
    const now = Date.now();
    while (this.delayedQueue.size > 0 && this.delayedQueue.peek().nextRunAt <= now) {
      const job = this.delayedQueue.extractMin();
      if (job) {
        job.status = JobStatus.WAITING;
        this.waitingQueue.insert(job);
        this._schedule();
      }
    }
  }

  // 超时 Promise 工厂
  _createTimeout(ms) {
    return new Promise((_, reject) => {
      setTimeout(() => reject(new Error(`Job timed out after ${ms}ms`)), ms);
    });
  }

  // 启动队列
  start() {
    this.running = true;
    this._schedule();
  }

  // 停止队列(等待当前任务完成)
  async stop() {
    this.running = false;
    clearInterval(this._delayTimer);
    // 等待活跃任务完成
    while (this.activeJobs > 0) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }

  // 获取任务状态
  getJob(id) { return this.jobs.get(id); }
  getWaitingCount() { return this.waitingQueue.size; }
  getDelayedCount() { return this.delayedQueue.size; }
  getActiveCount() { return this.activeJobs; }
}

关键结论: 指数退避中的**随机抖动(Jitter)**不是装饰——它防止了「惊群效应」(Thundering Herd)。当 1000 个任务同时失败时,没有抖动它们会在同一时刻重试,再次打爆下游服务。加上随机抖动后,重试请求被均匀分散到时间窗口内。

2.3 完整使用示例

// 创建队列实例
const queue = new JobQueue({
  name: 'email-queue',
  concurrency: 3, // 最多同时处理 3 个任务
});

// 注册处理函数
queue.process('send-email', async (data) => {
  console.log(`📧 发送邮件给 ${data.to}...`);
  // 模拟 SMTP 发送(有时会失败)
  if (Math.random() < 0.3) throw new Error('SMTP connection timeout');
  await new Promise(resolve => setTimeout(resolve, 500));
  return { messageId: `msg_${Date.now()}` };
});

// 监听事件
queue.on('completed', (job, result) => {
  console.log(`✅ 任务 ${job.id} 完成,耗时 ${job.completedAt - job.processedAt}ms`);
});
queue.on('retrying', (job, attempt, max) => {
  console.log(`🔄 任务 ${job.id} 第 ${attempt}/${max} 次重试`);
});
queue.on('dead', (job, error) => {
  console.log(`💀 任务 ${job.id} 进入死信队列: ${error.message}`);
});

// 启动队列
queue.start();

// 添加普通任务
queue.add('send-email', { to: 'alice@example.com', subject: 'Welcome' });

// 添加高优先级任务(优先处理)
queue.add('send-email', { to: 'bob@example.com', subject: 'Reset Password' }, {
  priority: 1, // 数值越小优先级越高
});

// 添加延迟任务(30 秒后执行)
queue.add('send-email', { to: 'carol@example.com', subject: 'Follow Up' }, {
  delay: 30000,
});

// 添加自定义重试次数的任务
queue.add('send-email', { to: 'dave@example.com', subject: 'Invoice' }, {
  maxAttempts: 5, // 最多重试 5 次
  priority: 100,
});

📊 三、持久化、监控与生产级部署

3.1 为什么内存队列不够用?

上一节的实现有一个致命问题:所有数据都在内存中,进程重启就全丢了。在生产环境中,队列必须具备持久化能力。

常见的持久化方案有三种:

方案 吞吐量 可靠性 运维成本 适用场景
Redis + BullMQ ⭐⭐⭐⭐⭐ 极高 ⭐⭐⭐ 中(需 AOF) ⭐⭐⭐ 需运维 Redis 高吞吐、分布式部署
PostgreSQL + pg-boss ⭐⭐⭐ 中 ⭐⭐⭐⭐⭐ 极高 ⭐⭐⭐⭐ 无需额外组件 已有 PG、强一致性要求
SQLite ⭐⭐⭐⭐ 高 ⭐⭐⭐⭐ 高 ⭐⭐⭐⭐⭐ 零运维 单机部署、中小规模
本节内存实现 ⭐⭐⭐⭐⭐ 极高 ⭐ 极低 ⭐⭐⭐⭐⭐ 零运维 开发/测试、可丢弃任务

💡 提示: 根据 BullMQ 作者的数据,Redis 方案在单机上可以达到 每秒 10,000+ 任务的吞吐量。但如果你的场景是每秒几百个任务,SQLite 或 PostgreSQL 完全够用,还能省去 Redis 的运维成本。

3.2 SQLite 持久化层实现

下面为队列添加 SQLite 持久化,让任务在进程重启后不丢失:

const Database = require('better-sqlite3');

class SQLiteStorage {
  constructor(dbPath = ':memory:') {
    this.db = new Database(dbPath);
    this.db.pragma('journal_mode = WAL'); // WAL 模式提升并发性能
    this._init();
  }

  _init() {
    this.db.exec(`
      CREATE TABLE IF NOT EXISTS jobs (
        id TEXT PRIMARY KEY,
        name TEXT NOT NULL,
        data TEXT NOT NULL,
        priority INTEGER DEFAULT 1000,
        status TEXT DEFAULT 'waiting',
        attempts INTEGER DEFAULT 0,
        max_attempts INTEGER DEFAULT 3,
        next_run_at INTEGER NOT NULL,
        created_at INTEGER NOT NULL,
        processed_at INTEGER,
        completed_at INTEGER,
        failed_reason TEXT,
        result TEXT
      );
      CREATE INDEX IF NOT EXISTS idx_status_priority
        ON jobs(status, priority, created_at);
      CREATE INDEX IF NOT EXISTS idx_next_run
        ON jobs(next_run_at) WHERE status = 'delayed';
    `);

    // 预编译常用 SQL 语句
    this._insertStmt = this.db.prepare(`
      INSERT INTO jobs (id, name, data, priority, status, max_attempts,
                        next_run_at, created_at)
      VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    `);
    this._updateStatusStmt = this.db.prepare(`
      UPDATE jobs SET status = ?, processed_at = ?, completed_at = ?,
                     failed_reason = ?, result = ?, attempts = ?,
                     next_run_at = ?
      WHERE id = ?
    `);
  }

  saveJob(job) {
    this._insertStmt.run(
      job.id, job.name, JSON.stringify(job.data),
      job.priority, job.status, job.maxAttempts,
      job.nextRunAt, job.createdAt
    );
  }

  updateJob(job) {
    this._updateStatusStmt.run(
      job.status, job.processedAt, job.completedAt,
      job.failedReason, job.result ? JSON.stringify(job.result) : null,
      job.attempts, job.nextRunAt, job.id
    );
  }

  // 恢复未完成的任务(进程重启时调用)
  recoverJobs() {
    // 将中断的 active 任务重置为 waiting
    this.db.prepare(`
      UPDATE jobs SET status = 'waiting', attempts = attempts
      WHERE status = 'active'
    `).run();

    // 返回所有等待中和延迟中的任务
    return this.db.prepare(`
      SELECT * FROM jobs WHERE status IN ('waiting', 'delayed')
      ORDER BY priority ASC, created_at ASC
    `).all().map(row => ({
      ...row,
      data: JSON.parse(row.data),
      result: row.result ? JSON.parse(row.result) : null,
    }));
  }
}

3.3 重试策略对比与选择

指数退避不是唯一的重试策略。不同场景需要不同的退避方案:

策略 公式 适用场景 示例间隔(4次重试)
固定间隔 delay 网络抖动、临时不可用 1s, 1s, 1s, 1s
线性递增 delay * attempt 渐进式后退 1s, 2s, 3s, 4s
指数退避 delay * 2^attempt API 限流、服务过载 1s, 2s, 4s, 8s
指数+抖动 delay * 2^attempt + rand 大规模并发重试 1.3s, 2.7s, 4.1s, 8.5s
退避+上限 min(delay * 2^attempt, max) 防止间隔过长 1s, 2s, 4s, 8s(上限 10s)

📌 记住: 永远不要用固定间隔重试处理大规模并发场景。AWS 的官方最佳实践明确推荐「指数退避 + 随机抖动」(Exponential Backoff with Jitter),这能将重试请求的峰值降低 50% 以上。

在代码中实现可配置的重试策略:

const retryStrategies = {
  // 固定间隔
  fixed: (baseDelay) => () => baseDelay,

  // 线性递增
  linear: (baseDelay) => (attempt) => baseDelay * attempt,

  // 指数退避
  exponential: (baseDelay) => (attempt) => baseDelay * Math.pow(2, attempt - 1),

  // 指数退避 + 随机抖动(推荐)
  exponentialJitter: (baseDelay) => (attempt) => {
    const exp = baseDelay * Math.pow(2, attempt - 1);
    const jitter = Math.random() * baseDelay;
    return exp + jitter;
  },

  // 指数退避 + 上限
  exponentialCap: (baseDelay, maxDelay = 30000) => (attempt) => {
    return Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay);
  },
};

// 在队列中使用
class JobQueueWithRetry extends JobQueue {
  constructor(options = {}) {
    super(options);
    this.retryStrategy = options.retryStrategy || retryStrategies.exponentialJitter;
    this.baseDelay = options.baseDelay || 1000;
  }

  _handleFailure(job, error) {
    job.failedReason = error.message;

    if (job.attempts < job.maxAttempts) {
      const delay = this.retryStrategy(this.baseDelay)(job.attempts);
      job.nextRunAt = Date.now() + delay;
      job.status = JobStatus.DELAYED;
      this.delayedQueue.insert(job);
      this.emit('retrying', job, job.attempts, job.maxAttempts, delay);
      this._schedule();
    } else {
      job.status = JobStatus.DEAD;
      this.stats.dead++;
      this.emit('dead', job, error);
      this._schedule();
    }

    this.stats.failed++;
    this.emit('failed', job, error, job.attempts);
  }
}

3.4 与现有方案的对比

在决定是自建还是使用现有方案时,需要考虑以下因素:

维度 自建队列(本文) BullMQ pg-boss Cloudflare Queues
依赖 零依赖 Redis PostgreSQL Cloudflare 平台
部署复杂度 ⭐ 极低 ⭐⭐⭐ 需 Redis ⭐⭐ 需 PG ⭐⭐⭐⭐ 需 CF
分布式支持 ❌ 单机 ✅ 原生支持 ✅ 基于 PG ✅ 全球分布
延迟任务
优先级 ❌ 需手动实现 ❌ FIFO
重试策略 ✅ 可配置 ✅ 内置 ✅ 内置 ✅ 内置
仪表盘 ❌ 需自建 ✅ Bull Board ✅ CF Dashboard
适用场景 学习/轻量/嵌入 大规模生产 已有 PG 栈 无服务器架构

关键结论: 自建队列的最大价值不是替代 BullMQ,而是理解原理。当你知道优先级堆、指数退避、并发控制的实现细节后,你才能在生产环境中快速定位队列相关的性能瓶颈和可靠性问题。

🎯 四、生产环境避坑指南

4.1 常见陷阱

在将任务队列部署到生产环境时,以下是最常踩的坑:

  • 不设置任务超时 — 一个卡死的任务会长期占用 Worker,最终耗尽并发槽位
  • 不做幂等设计 — 重试机制保证「至少一次」执行,非幂等操作会导致数据重复
  • 忽略死信队列监控 — 死信队列积压意味着有系统性问题,必须及时告警
  • 任务数据过大 — 将整个文件内容塞进任务数据会导致内存膨胀,应该传引用(URL/ID)
  • 没有优雅关闭 — 强制 kill 进程会丢失正在执行的任务,必须监听 SIGTERM 并等待完成

推荐做法:

  • ✅ 所有任务处理函数必须是幂等的(同样的输入执行多次结果一致)
  • ✅ 设置合理的超时时间(邮件 30s,报表 5min,API 调用 10s)
  • ✅ 监控死信队列的积压量和增长率,设置告警阈值
  • ✅ 大数据走对象存储,任务中只传引用
  • ✅ 实现优雅关闭SIGTERM 处理,等待活跃任务完成)

4.2 幂等性实现模式

既然重试机制保证「至少一次」投递,任务处理函数必须是幂等的:

// ❌ 错误写法:非幂等的扣款操作
async function deductBalance(userId, amount) {
  const user = await db.getUser(userId);
  await db.updateUser(userId, { balance: user.balance - amount });
  // 重试时会重复扣款!
}

// ✅ 正确写法:使用幂等键
async function deductBalance(userId, amount, idempotencyKey) {
  // 检查是否已处理过
  const existing = await db.getIdempotencyRecord(idempotencyKey);
  if (existing) return existing.result;

  // 使用数据库事务保证原子性
  const result = await db.transaction(async (tx) => {
    const user = await tx.getUser(userId);
    if (user.balance < amount) throw new Error('Insufficient balance');
    await tx.updateUser(userId, { balance: user.balance - amount });
    return { newBalance: user.balance - amount };
  });

  // 记录幂等键
  await db.saveIdempotencyRecord(idempotencyKey, result);
  return result;
}

📝 总结

通过从零构建任务队列,我们深入理解了以下核心概念:

  1. 二叉最小堆 — 优先级队列的底层数据结构,O(log n) 的插入和删除
  2. 指数退避 + 随机抖动 — 防止重试风暴的最佳策略
  3. 死信队列 — 超过最大重试次数的任务的最终归宿,是系统健康的重要指标
  4. 并发控制 — 保护下游服务不被打爆的关键机制
  5. 幂等性 — 重试机制的必要配套,没有幂等就没有可靠的重试

对于不同规模的项目,我的建议是:

  • 学习/原型阶段:用本文的自建实现,理解原理
  • 单机生产环境:SQLite + WAL 模式,零运维成本
  • 分布式生产环境:BullMQ(Redis)或 pg-boss(PostgreSQL)
  • 无服务器架构:Cloudflare Queues 或 AWS SQS

💡 提示: 如果你在 jsjson.com 的 JSON 格式化工具中处理超大 JSON 文件,也可以借鉴任务队列的思路——将文件分块后放入内存队列,用 Worker 并行处理,最后合并结果。

🔗 相关工具推荐

📚 相关文章