AI Agent 并发执行引擎实战:多任务并行、Worker Pool 与分布式调度

深入解析 AI Agent 从串行到并行的架构升级,涵盖 Worker Pool 模式、任务队列集成、分布式 Agent 编排、并发成本控制与错误隔离,附完整 TypeScript 实现与生产避坑指南。

开发者效率 2026-06-12 20 分钟

2026 年,AI Agent 已经从「单轮对话机器人」进化为能执行多步骤复杂任务的自主系统。但当你的产品需要同时处理 100 个用户请求、每个请求触发一个 Agent 执行 5-10 步工具调用时,串行执行模式立刻成为瓶颈。根据 LangChain 2026 Q2 的生产数据,采用并发 Agent 架构的系统,其吞吐量是串行架构的 8-15 倍,而 P99 延迟降低了 60%。 问题是,并发不只是「多开几个 Promise」那么简单——你需要处理任务调度、错误隔离、成本控制和分布式协调等一系列工程挑战。本文将从零构建一个生产级的 Agent 并发执行引擎。

🏗️ 一、为什么串行 Agent 架构撑不住生产流量

1.1 串行执行的三重瓶颈

一个典型的 Agent 执行流程是:接收用户输入 → LLM 推理 → 调用工具 → LLM 再推理 → 返回结果。单次执行耗时 3-15 秒,如果 100 个用户同时发起请求,串行处理意味着最后一个用户要等 5-25 分钟

更隐蔽的问题是资源争用:当多个 Agent 共享同一个 LLM API Key 时,串行请求无法充分利用 API 的并发配额;当 Agent 调用外部工具(数据库、搜索引擎)时,I/O 等待时间白白浪费。

// ❌ 串行执行:每个 Agent 依次等待
async function runAgentsSerial(tasks: AgentTask[]): Promise<AgentResult[]> {
  const results: AgentResult[] = [];
  for (const task of tasks) {
    const result = await runAgent(task); // 每次等 3-15 秒
    results.push(result);
  }
  return results; // 100 个任务 = 300-1500 秒
}
// ✅ 并发执行:利用 Promise.allSettled 并行处理
async function runAgentsConcurrent(tasks: AgentTask[]): Promise<AgentResult[]> {
  const results = await Promise.allSettled(
    tasks.map(task => runAgent(task))
  );
  return results.map(r => r.status === 'fulfilled' ? r.value : { error: r.reason });
  // 100 个任务 ≈ 3-15 秒(取决于最慢的那个)
}

⚠️ 警告:裸用 Promise.allSettled 处理 100+ 并发 Agent 会导致 LLM API 限流(Rate Limit)、内存暴涨和无法追踪的错误。你需要一个有并发控制队列管理错误隔离的执行引擎。

1.2 并发 Agent 的五大工程挑战

挑战 串行架构 并发架构需求
并发控制 无(天然串行) 信号量/Semaphore 限制并发数
错误隔离 一个失败全部停 单个失败不影响其他任务
成本控制 简单累加 实时 Token 预算 + 熔断
状态管理 线性状态 每个 Agent 独立状态空间
可观测性 简单日志 分布式 Trace + 聚合指标

🔧 二、从零构建 Agent Worker Pool 执行引擎

2.1 核心架构:Semaphore + Queue + Isolation

一个生产级的 Agent 并发引擎需要三个核心组件:

  1. 并发控制器(Semaphore):限制同时运行的 Agent 数量,防止 LLM API 限流
  2. 任务队列(Queue):缓冲待执行的任务,支持优先级和重试
  3. 错误隔离(Isolation):每个 Agent 在独立的 try-catch 中运行,互不影响
// agent-worker-pool.ts — Agent 并发执行引擎核心实现
import { EventEmitter } from 'events';

interface AgentTask {
  id: string;
  input: string;
  priority: number;       // 1-10,数字越大优先级越高
  maxRetries: number;
  tokenBudget: number;    // 单任务 Token 上限
  timeout: number;        // 单任务超时(ms)
}

interface AgentResult {
  taskId: string;
  output: string;
  tokensUsed: number;
  duration: number;
  retries: number;
  error?: string;
}

interface PoolConfig {
  concurrency: number;     // 最大并发数
  globalTokenBudget: number; // 全局 Token 预算
  rateLimitRPM: number;    // 每分钟最大请求数
  taskTimeout: number;     // 默认任务超时
}

class AgentWorkerPool extends EventEmitter {
  private queue: AgentTask[] = [];
  private running = new Map<string, Promise<AgentResult>>();
  private semaphore: number;
  private config: PoolConfig;
  private totalTokensUsed = 0;
  private requestTimestamps: number[] = [];
  private aborted = false;

  constructor(config: PoolConfig) {
    super();
    this.config = config;
    this.semaphore = config.concurrency;
  }

  // 提交任务到队列
  submit(task: AgentTask): void {
    if (this.aborted) throw new Error('Pool has been shut down');
    this.queue.push(task);
    this.queue.sort((a, b) => b.priority - a.priority); // 高优先级排前面
    this.emit('task:queued', task);
    this.processQueue();
  }

  // 批量提交并等待所有完成
  async submitAll(tasks: AgentTask[]): Promise<AgentResult[]> {
    const promises = tasks.map(task => {
      return new Promise<AgentResult>((resolve) => {
        this.submit(task);
        this.once(`task:done:${task.id}`, resolve);
      });
    });
    return Promise.all(promises);
  }

  // 核心调度逻辑
  private async processQueue(): Promise<void> {
    while (this.semaphore > 0 && this.queue.length > 0 && !this.aborted) {
      const task = this.queue.shift()!;
      this.semaphore--;
      const promise = this.executeWithIsolation(task);
      this.running.set(task.id, promise);
      
      promise.finally(() => {
        this.running.delete(task.id);
        this.semaphore++;
        this.emit('task:done', task.id);
        this.processQueue(); // 递归处理下一个
      });
    }
  }

  // 带错误隔离的任务执行
  private async executeWithIsolation(task: AgentTask): Promise<AgentResult> {
    const startTime = Date.now();
    let retries = 0;

    while (retries <= task.maxRetries) {
      try {
        // 检查全局 Token 预算
        if (this.totalTokensUsed >= this.config.globalTokenBudget) {
          throw new Error('Global token budget exhausted');
        }

        // 检查速率限制
        await this.enforceRateLimit();

        // 带超时的 Agent 执行
        const result = await this.executeWithTimeout(task);
        
        this.totalTokensUsed += result.tokensUsed;
        this.emit('task:completed', result);
        this.emit(`task:done:${task.id}`, result);
        return result;

      } catch (error) {
        retries++;
        if (retries > task.maxRetries) {
          const result: AgentResult = {
            taskId: task.id,
            output: '',
            tokensUsed: 0,
            duration: Date.now() - startTime,
            retries: retries - 1,
            error: (error as Error).message,
          };
          this.emit('task:failed', result);
          this.emit(`task:done:${task.id}`, result);
          return result;
        }
        // 指数退避重试
        await this.sleep(Math.min(1000 * Math.pow(2, retries), 10000));
        this.emit('task:retry', { taskId: task.id, attempt: retries });
      }
    }

    throw new Error('Unreachable');
  }

  // 超时控制
  private async executeWithTimeout(task: AgentTask): Promise<AgentResult> {
    return Promise.race([
      this.runAgent(task),
      new Promise<never>((_, reject) =>
        setTimeout(() => reject(new Error(`Task ${task.id} timed out after ${task.timeout}ms`)), task.timeout)
      ),
    ]);
  }

  // 速率限制(令牌桶)
  private async enforceRateLimit(): Promise<void> {
    const now = Date.now();
    this.requestTimestamps = this.requestTimestamps.filter(t => now - t < 60000);
    
    if (this.requestTimestamps.length >= this.config.rateLimitRPM) {
      const oldestInWindow = this.requestTimestamps[0];
      const waitTime = 60000 - (now - oldestInWindow) + 100; // +100ms buffer
      await this.sleep(waitTime);
    }
    
    this.requestTimestamps.push(Date.now());
  }

  // 实际的 Agent 执行逻辑(接入你的 LLM + Tools)
  private async runAgent(task: AgentTask): Promise<AgentResult> {
    const startTime = Date.now();
    // 这里接入你的 Agent 逻辑,例如:
    // const agent = new Agent({ model: 'claude-sonnet-4-20250514', tools: [...] });
    // const output = await agent.run(task.input);
    
    // 模拟执行
    const output = `Agent completed: ${task.input}`;
    const tokensUsed = Math.floor(Math.random() * 1000) + 200;
    
    return {
      taskId: task.id,
      output,
      tokensUsed,
      duration: Date.now() - startTime,
      retries: 0,
    };
  }

  // 优雅关闭
  async shutdown(): Promise<void> {
    this.aborted = true;
    // 等待所有正在运行的任务完成
    await Promise.allSettled(Array.from(this.running.values()));
    this.emit('pool:shutdown');
  }

  // 获取运行状态
  getStats() {
    return {
      queued: this.queue.length,
      running: this.running.size,
      availableSlots: this.semaphore,
      totalTokensUsed: this.totalTokensUsed,
      rpm: this.requestTimestamps.filter(t => Date.now() - t < 60000).length,
    };
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

💡 **提示:**上述实现中,Semaphore 用简单的数字递减/递增实现,比 p-limit 等第三方库更透明,且支持动态调整并发数。生产环境中建议增加 Prometheus metrics 导出。

2.2 使用示例:批量处理用户请求

// usage.ts — 批量 Agent 任务处理示例
const pool = new AgentWorkerPool({
  concurrency: 10,           // 最多 10 个 Agent 同时运行
  globalTokenBudget: 500000, // 全局 50 万 Token 预算
  rateLimitRPM: 200,         // 每分钟最多 200 次 API 调用
  taskTimeout: 30000,        // 单任务 30 秒超时
});

// 监听事件
pool.on('task:completed', (result) => {
  console.log(`✅ Task ${result.taskId} completed in ${result.duration}ms, ${result.tokensUsed} tokens`);
});

pool.on('task:failed', (result) => {
  console.error(`❌ Task ${result.taskId} failed after ${result.retries} retries: ${result.error}`);
});

// 提交 100 个任务
const tasks: AgentTask[] = Array.from({ length: 100 }, (_, i) => ({
  id: `task-${i}`,
  input: `分析用户 #${i} 的行为数据并生成推荐`,
  priority: i < 10 ? 10 : 5,      // 前 10 个高优先级
  maxRetries: 2,
  tokenBudget: 5000,
  timeout: 15000,
}));

const results = await pool.submitAll(tasks);
console.log(`\n📊 完成统计:`, pool.getStats());

🌐 三、分布式 Agent 调度:跨节点的并发编排

3.1 从单机到分布式的架构演进

当单机 Worker Pool 的并发能力触顶(通常受限于内存和单进程 CPU),你需要将 Agent 任务分发到多个节点。核心方案是消息队列 + 分布式 Worker

方案 适用场景 优势 劣势
BullMQ + Redis 中等规模,10-100 并发 成熟稳定,支持优先级/延迟/重试 依赖 Redis
RabbitMQ 大规模,100-1000+ 并发 企业级消息路由,支持复杂拓扑 运维复杂度高
Redis Streams 轻量级,5-50 并发 无额外依赖,原生 Redis 功能 功能较简单
Temporal 需要持久化工作流 强一致性,自动重试,可视化 学习曲线陡峭

3.2 BullMQ 集成:生产级分布式 Agent 调度

// agent-bullmq-worker.ts — BullMQ 分布式 Agent Worker
import { Worker, Queue, Job } from 'bullmq';
import Redis from 'ioredis';

const connection = new Redis({ host: 'localhost', port: 6379, maxRetriesPerRequest: null });

// 定义 Agent 任务队列
const agentQueue = new Queue('agent-tasks', { connection });

// Agent Worker:消费队列中的任务
const agentWorker = new Worker(
  'agent-tasks',
  async (job: Job) => {
    const { input, priority, tokenBudget, timeout } = job.data;
    
    // 更新进度
    await job.updateProgress({ stage: 'reasoning', step: 1 });
    
    // 执行 Agent(带超时控制)
    const controller = new AbortController();
    const timer = setTimeout(() => controller.abort(), timeout);
    
    try {
      const result = await executeAgent(input, {
        tokenBudget,
        signal: controller.signal,
        onProgress: (progress) => job.updateProgress(progress),
      });
      
      clearTimeout(timer);
      return result;
    } catch (error) {
      clearTimeout(timer);
      throw error; // BullMQ 会根据 retry 配置自动重试
    }
  },
  {
    connection,
    concurrency: 5,                // 每个 Worker 进程 5 并发
    limiter: {                     // 速率限制
      max: 50,
      duration: 60000,             // 每分钟最多 50 个任务
    },
    settings: {
      backoffStrategy: (attemptsMade: number) => {
        return Math.min(1000 * Math.pow(2, attemptsMade), 30000); // 指数退避
      },
    },
  }
);

// 添加任务示例
await agentQueue.add('analyze', {
  input: '分析销售数据',
  priority: 1,
  tokenBudget: 5000,
  timeout: 15000,
}, {
  priority: 1,              // BullMQ 优先级(数字越小越优先)
  attempts: 3,              // 最多重试 3 次
  backoff: { type: 'exponential', delay: 2000 },
  removeOnComplete: 1000,   // 保留最近 1000 个完成的任务
  removeOnFail: 5000,       // 保留最近 5000 个失败的任务
});

// 事件监听
agentWorker.on('completed', (job) => {
  console.log(`✅ Job ${job.id} completed`);
});

agentWorker.on('failed', (job, err) => {
  console.error(`❌ Job ${job?.id} failed: ${err.message}`);
});

📌 **记住:**分布式 Agent 系统中,幂等性是第一优先级。由于网络抖动和 Worker 重启,同一个任务可能被执行多次。确保你的 Agent 工具调用是幂等的——例如,写入数据库时使用 INSERT ... ON CONFLICT DO UPDATE 而非裸 INSERT

3.3 动态扩缩容:根据队列深度自动调整

// auto-scaler.ts — 根据队列深度动态调整 Worker 并发数
async function autoScaleWorker(worker: Worker, queue: Queue, config: {
  minConcurrency: number;
  maxConcurrency: number;
  scaleUpThreshold: number;   // 队列深度超过此值时扩容
  scaleDownThreshold: number; // 队列深度低于此值时缩容
  checkInterval: number;      // 检查间隔(ms)
}) {
  let currentConcurrency = config.minConcurrency;
  
  setInterval(async () => {
    const waiting = await queue.getWaitingCount();
    const active = await queue.getActiveCount();
    
    if (waiting > config.scaleUpThreshold && currentConcurrency < config.maxConcurrency) {
      currentConcurrency = Math.min(currentConcurrency + 2, config.maxConcurrency);
      await worker.concurrency(currentConcurrency);
      console.log(`📈 Scaled UP to ${currentConcurrency} (queue: ${waiting} waiting)`);
    } else if (waiting < config.scaleDownThreshold && currentConcurrency > config.minConcurrency) {
      currentConcurrency = Math.max(currentConcurrency - 1, config.minConcurrency);
      await worker.concurrency(currentConcurrency);
      console.log(`📉 Scaled DOWN to ${currentConcurrency} (queue: ${waiting} waiting)`);
    }
  }, config.checkInterval);
}

💰 四、并发成本控制:防止 Token 预算爆炸

4.1 并发场景下的成本失控风险

串行执行时,成本是可预测的——每个任务消耗 500-2000 Token,100 个任务就是 5-20 万 Token。但并发执行时,重试风暴可能让成本翻 3-5 倍:当 LLM API 返回 429 (Rate Limit) 时,所有并发任务同时触发重试,瞬间消耗大量 Token。

// cost-guard.ts — 并发 Agent 成本控制中间件
class CostGuard {
  private totalTokens = 0;
  private budget: number;
  private circuitOpen = false;
  private failureCount = 0;
  private readonly failureThreshold = 5;
  private readonly resetTimeout = 30000;

  constructor(budget: number) {
    this.budget = budget;
  }

  // 检查是否允许执行(熔断器模式)
  canExecute(estimatedTokens: number): boolean {
    if (this.circuitOpen) return false;
    if (this.totalTokens + estimatedTokens > this.budget) return false;
    return true;
  }

  // 记录 Token 消耗
  recordUsage(tokens: number): void {
    this.totalTokens += tokens;
  }

  // 记录失败(用于熔断器)
  recordFailure(): void {
    this.failureCount++;
    if (this.failureCount >= this.failureThreshold) {
      this.circuitOpen = true;
      console.warn('🔴 Circuit breaker OPEN — stopping all agent execution');
      setTimeout(() => {
        this.circuitOpen = false;
        this.failureCount = 0;
        console.log('🟢 Circuit breaker RESET — resuming execution');
      }, this.resetTimeout);
    }
  }

  recordSuccess(): void {
    this.failureCount = Math.max(0, this.failureCount - 1);
  }

  getStats() {
    return {
      totalTokens: this.totalTokens,
      budgetRemaining: this.budget - this.totalTokens,
      budgetUsedPercent: ((this.totalTokens / this.budget) * 100).toFixed(1) + '%',
      circuitOpen: this.circuitOpen,
    };
  }
}

⚠️ 警告:并发 Agent 的最大成本风险不是单次调用,而是重试风暴。当 LLM API 出现间歇性故障时,N 个并发 Agent × M 次重试 = N×M 倍的 Token 消耗。务必在 Worker 层面设置全局熔断器,而非仅在单个 Agent 内部处理。

4.2 Token 预算分配策略

策略 适用场景 实现方式
固定预算 任务复杂度已知 每个任务分配固定 Token 额度
动态预算 任务复杂度未知 根据前几步消耗动态调整后续预算
优先级预算 有高低优先级混合 高优先级任务可「借用」低优先级额度
全局熔断 成本敏感场景 全局 Token 耗尽后停止所有新任务

🛡️ 五、生产避坑指南与最佳实践

5.1 六个常见的并发 Agent 陷阱

  1. 裸用 Promise.all 不限制并发数 — 当任务量达到 1000+ 时,会同时创建 1000 个 LLM 连接,导致内存暴涨和 API 限流
  2. 错误处理放在 Agent 内部 — Agent 内部的 try-catch 无法防止外部的队列堆积和 Worker 崩溃
  3. 共享可变状态 — 多个 Agent 并发修改同一个数据库记录会导致数据竞争
  4. 不做超时控制 — 一个卡住的 Agent 会永久占用 Worker 槽位
  5. 忽略指数退避 — 所有 Agent 同时重试同一个失败的 API 会产生「惊群效应」
  6. 不监控队列深度 — 队列积压到 10000+ 才发现,已造成大量超时

5.2 生产检查清单

  • ✅ 设置合理的 Worker 并发数(建议:LLM API RPM / 平均单任务耗时 × 60)
  • ✅ 每个任务设置独立的超时(建议:P95 耗时 × 2)
  • ✅ 实现全局 Token 预算 + 熔断器
  • ✅ 使用指数退避 + 随机抖动(Jitter)避免重试风暴
  • ✅ Agent 工具调用实现幂等性
  • ✅ 监控队列深度、Worker 利用率、Token 消耗速率
  • ✅ 设置死信队列(Dead Letter Queue)收集持续失败的任务
  • ✅ 优雅关闭:收到 SIGTERM 时等待正在运行的任务完成

📝 总结

AI Agent 并发执行不是「多开几个 Promise」的简单问题,而是一个涉及调度、隔离、成本控制和分布式协调的系统工程。核心要点:

  1. Semaphore 控制并发:用信号量限制同时运行的 Agent 数量,避免 API 限流
  2. 错误隔离:每个 Agent 在独立的执行上下文中运行,单个失败不影响整体
  3. 成本熔断:全局 Token 预算 + 熔断器模式,防止重试风暴导致成本失控
  4. 分布式队列:超过单机能力时,用 BullMQ/RabbitMQ 将任务分发到多个 Worker 节点
  5. 可观测性:实时监控队列深度、Worker 利用率、Token 消耗和错误率

关键结论:并发 Agent 系统的成功不取决于并发数有多高,而取决于错误隔离和成本控制有多完善。一个能优雅处理失败、精确控制成本的 10 并发系统,远比一个失控的 100 并发系统更有价值。

🔗 相关工具推荐

  • BullMQ:基于 Redis 的高性能任务队列,支持优先级、延迟、重试和死信队列
  • Temporal:持久化工作流引擎,适合需要强一致性的长时 Agent 任务
  • p-limit:轻量级 Promise 并发控制库(适合简单场景)
  • Langfuse:LLM 应用可观测性平台,支持 Token 追踪和 Agent Trace
  • Inngest:无服务器任务编排平台,内置 Agent 执行支持

📚 相关文章